Refactored makeStreamedRequest out & replaced sendHttpRequest underlying code

This commit is contained in:
Manas Hejmadi
2025-07-03 19:27:49 +05:30
parent ce1a463d75
commit 2ab6de6a62
4 changed files with 210 additions and 182 deletions

View File

@@ -27,7 +27,7 @@ class _SSEDisplayState extends State<SSEDisplay> {
return SingleChildScrollView(
child: Column(
crossAxisAlignment: CrossAxisAlignment.stretch,
children: sse.reversed.map<Widget>((chunk) {
children: sse.reversed.where((e) => e != '').map<Widget>((chunk) {
Map<String, dynamic>? parsedJson;
try {
parsedJson = jsonDecode(chunk);

View File

@@ -67,9 +67,13 @@ class HttpResponseModel with _$HttpResponseModel {
HttpHeaders.contentLengthHeader: response.contentLength.toString(),
}, response.headers);
MediaType? mediaType = getMediaTypeFromHeaders(responseHeaders);
final body = (mediaType?.subtype == kSubTypeJson)
? utf8.decode(response.bodyBytes)
: response.body;
//TODO: Review Effectiveness
final body = decodeBytes(
response.bodyBytes,
response.headers['content-type']!,
);
return HttpResponseModel(
statusCode: response.statusCode,
headers: responseHeaders,

View File

@@ -20,121 +20,132 @@ Future<(HttpResponse?, Duration?, String?)> sendHttpRequest(
SupportedUriSchemes defaultUriScheme = kDefaultUriScheme,
bool noSSL = false,
}) async {
if (httpClientManager.wasRequestCancelled(requestId)) {
httpClientManager.removeCancelledRequest(requestId);
}
final client = httpClientManager.createClient(requestId, noSSL: noSSL);
(Uri?, String?) uriRec = getValidRequestUri(
requestModel.url,
requestModel.enabledParams,
final stream = await streamHttpRequest(
requestId,
apiType,
requestModel,
defaultUriScheme: defaultUriScheme,
noSSL: noSSL,
);
final output = await stream.last;
if (uriRec.$1 != null) {
Uri requestUrl = uriRec.$1!;
Map<String, String> headers = requestModel.enabledHeadersMap;
bool overrideContentType = false;
HttpResponse? response;
String? body;
try {
Stopwatch stopwatch = Stopwatch()..start();
if (apiType == APIType.rest) {
var isMultiPartRequest =
requestModel.bodyContentType == ContentType.formdata;
return (output?.$2, output?.$3, output?.$4);
if (kMethodsWithBody.contains(requestModel.method)) {
var requestBody = requestModel.body;
if (requestBody != null &&
!isMultiPartRequest &&
requestBody.isNotEmpty) {
body = requestBody;
if (requestModel.hasContentTypeHeader) {
overrideContentType = true;
} else {
headers[HttpHeaders.contentTypeHeader] =
requestModel.bodyContentType.header;
}
}
if (isMultiPartRequest) {
var multiPartRequest = http.MultipartRequest(
requestModel.method.name.toUpperCase(),
requestUrl,
);
multiPartRequest.headers.addAll(headers);
for (var formData in requestModel.formDataList) {
if (formData.type == FormDataType.text) {
multiPartRequest.fields.addAll({formData.name: formData.value});
} else {
multiPartRequest.files.add(
await http.MultipartFile.fromPath(
formData.name,
formData.value,
),
);
}
}
http.StreamedResponse multiPartResponse = await client.send(
multiPartRequest,
);
// if (httpClientManager.wasRequestCancelled(requestId)) {
// httpClientManager.removeCancelledRequest(requestId);
// }
// final client = httpClientManager.createClient(requestId, noSSL: noSSL);
stopwatch.stop();
http.Response convertedMultiPartResponse =
await convertStreamedResponse(multiPartResponse);
return (convertedMultiPartResponse, stopwatch.elapsed, null);
}
}
switch (requestModel.method) {
case HTTPVerb.get:
response = await client.get(requestUrl, headers: headers);
break;
case HTTPVerb.head:
response = await client.head(requestUrl, headers: headers);
break;
case HTTPVerb.post:
case HTTPVerb.put:
case HTTPVerb.patch:
case HTTPVerb.delete:
case HTTPVerb.options:
final request = prepareHttpRequest(
url: requestUrl,
method: requestModel.method.name.toUpperCase(),
headers: headers,
body: body,
overrideContentType: overrideContentType,
);
final streamed = await client.send(request);
response = await http.Response.fromStream(streamed);
break;
}
}
if (apiType == APIType.graphql) {
var requestBody = getGraphQLBody(requestModel);
if (requestBody != null) {
var contentLength = utf8.encode(requestBody).length;
if (contentLength > 0) {
body = requestBody;
headers[HttpHeaders.contentLengthHeader] = contentLength.toString();
if (!requestModel.hasContentTypeHeader) {
headers[HttpHeaders.contentTypeHeader] = ContentType.json.header;
}
}
}
response = await client.post(requestUrl, headers: headers, body: body);
}
stopwatch.stop();
return (response, stopwatch.elapsed, null);
} catch (e) {
if (httpClientManager.wasRequestCancelled(requestId)) {
return (null, null, kMsgRequestCancelled);
}
return (null, null, e.toString());
} finally {
httpClientManager.closeClient(requestId);
}
} else {
return (null, null, uriRec.$2);
}
// (Uri?, String?) uriRec = getValidRequestUri(
// requestModel.url,
// requestModel.enabledParams,
// defaultUriScheme: defaultUriScheme,
// );
// if (uriRec.$1 != null) {
// Uri requestUrl = uriRec.$1!;
// Map<String, String> headers = requestModel.enabledHeadersMap;
// bool overrideContentType = false;
// HttpResponse? response;
// String? body;
// try {
// Stopwatch stopwatch = Stopwatch()..start();
// if (apiType == APIType.rest) {
// var isMultiPartRequest =
// requestModel.bodyContentType == ContentType.formdata;
// if (kMethodsWithBody.contains(requestModel.method)) {
// var requestBody = requestModel.body;
// if (requestBody != null &&
// !isMultiPartRequest &&
// requestBody.isNotEmpty) {
// body = requestBody;
// if (requestModel.hasContentTypeHeader) {
// overrideContentType = true;
// } else {
// headers[HttpHeaders.contentTypeHeader] =
// requestModel.bodyContentType.header;
// }
// }
// if (isMultiPartRequest) {
// var multiPartRequest = http.MultipartRequest(
// requestModel.method.name.toUpperCase(),
// requestUrl,
// );
// multiPartRequest.headers.addAll(headers);
// for (var formData in requestModel.formDataList) {
// if (formData.type == FormDataType.text) {
// multiPartRequest.fields.addAll({formData.name: formData.value});
// } else {
// multiPartRequest.files.add(
// await http.MultipartFile.fromPath(
// formData.name,
// formData.value,
// ),
// );
// }
// }
// http.StreamedResponse multiPartResponse = await client.send(
// multiPartRequest,
// );
// stopwatch.stop();
// http.Response convertedMultiPartResponse =
// await convertStreamedResponse(multiPartResponse);
// return (convertedMultiPartResponse, stopwatch.elapsed, null);
// }
// }
// switch (requestModel.method) {
// case HTTPVerb.get:
// response = await client.get(requestUrl, headers: headers);
// break;
// case HTTPVerb.head:
// response = await client.head(requestUrl, headers: headers);
// break;
// case HTTPVerb.post:
// case HTTPVerb.put:
// case HTTPVerb.patch:
// case HTTPVerb.delete:
// case HTTPVerb.options:
// final request = prepareHttpRequest(
// url: requestUrl,
// method: requestModel.method.name.toUpperCase(),
// headers: headers,
// body: body,
// overrideContentType: overrideContentType,
// );
// final streamed = await client.send(request);
// response = await http.Response.fromStream(streamed);
// break;
// }
// }
// if (apiType == APIType.graphql) {
// var requestBody = getGraphQLBody(requestModel);
// if (requestBody != null) {
// var contentLength = utf8.encode(requestBody).length;
// if (contentLength > 0) {
// body = requestBody;
// headers[HttpHeaders.contentLengthHeader] = contentLength.toString();
// if (!requestModel.hasContentTypeHeader) {
// headers[HttpHeaders.contentTypeHeader] = ContentType.json.header;
// }
// }
// }
// response = await client.post(requestUrl, headers: headers, body: body);
// }
// stopwatch.stop();
// return (response, stopwatch.elapsed, null);
// } catch (e) {
// if (httpClientManager.wasRequestCancelled(requestId)) {
// return (null, null, kMsgRequestCancelled);
// }
// return (null, null, e.toString());
// } finally {
// httpClientManager.closeClient(requestId);
// }
// } else {
// return (null, null, uriRec.$2);
// }
}
void cancelHttpRequest(String? requestId) {
@@ -230,16 +241,94 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
return controller.stream;
}
try {
final streamedResponse = await makeStreamedRequest(
client: client,
uri: uri,
requestModel: requestModel,
apiType: apiType,
);
//----------------- Response Handling ---------------------
HttpResponse getResponseFromBytes(List<int> bytes) {
return HttpResponse.bytes(
bytes,
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
);
}
final buffer = StringBuffer();
final contentType =
streamedResponse.headers['content-type']?.toString() ?? '';
final contentLength =
int.tryParse(streamedResponse.headers['content-length'] ?? '') ?? -1;
int receivedBytes = 0;
bool hasEmitted = false;
subscription = streamedResponse.stream.listen(
(bytes) {
if (controller.isClosed) return;
final isStreaming = kStreamingResponseTypes.contains(contentType);
if (isStreaming) {
//For Streaming responses, output every response
final response = getResponseFromBytes(bytes);
controller.add((true, response, stopwatch.elapsed, null));
return;
}
//For non Streaming events, add output to buffer
receivedBytes += bytes.length;
buffer.write(decodeBytes(bytes, contentType));
if (!hasEmitted &&
contentLength > 0 &&
receivedBytes >= contentLength &&
!controller.isClosed) {
final response = getResponseFromBytes(buffer.toString().codeUnits);
controller.add((false, response, stopwatch.elapsed, null));
hasEmitted = true;
}
},
onDone: () {
//handle cases where response is larger than a TCP packet and cuts mid-way
if (!hasEmitted && !controller.isClosed) {
final response = getResponseFromBytes(buffer.toString().codeUnits);
final isStreaming = kStreamingResponseTypes.contains(contentType);
controller.add((isStreaming, response, stopwatch.elapsed, null));
}
cleanup();
},
onError: handleError,
);
return controller.stream;
} catch (e) {
await handleError(e);
return controller.stream;
}
}
Future<http.StreamedResponse> makeStreamedRequest({
required http.Client client,
required Uri uri,
required HttpRequestModel requestModel,
required APIType apiType,
}) async {
final headers = requestModel.enabledHeadersMap;
final hasBody = kMethodsWithBody.contains(requestModel.method);
final isMultipart = requestModel.bodyContentType == ContentType.formdata;
http.StreamedResponse streamedResponse;
try {
//----------------- Request Creation ---------------------
//Handling HTTP Multipart Requests
if (apiType == APIType.rest && isMultipart && hasBody) {
if (requestModel == APIType.rest && isMultipart && hasBody) {
final multipart = http.MultipartRequest(
requestModel.method.name.toUpperCase(),
uri,
@@ -294,71 +383,5 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
);
streamedResponse = await client.send(request);
}
//----------------- Response Handling ---------------------
HttpResponse getResponseFromBytes(List<int> bytes) {
return HttpResponse.bytes(
bytes,
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
);
}
final buffer = StringBuffer();
final contentType =
streamedResponse.headers['content-type']?.toString() ?? '';
final contentLength =
int.tryParse(streamedResponse.headers['content-length'] ?? '') ?? -1;
int receivedBytes = 0;
bool hasEmitted = false;
subscription = streamedResponse.stream.listen(
(bytes) {
if (controller.isClosed) return;
final isStreaming = kStreamingResponseTypes.contains(contentType);
if (isStreaming) {
//For Streaming responses, output every response
final response = getResponseFromBytes(bytes);
controller.add((true, response, stopwatch.elapsed, null));
return;
}
//For non Streaming events, add output to buffer
receivedBytes += bytes.length;
buffer.write(decodeBytes(bytes, contentType));
if (!hasEmitted &&
contentLength > 0 &&
receivedBytes >= contentLength &&
!controller.isClosed) {
final response = getResponseFromBytes(buffer.toString().codeUnits);
controller.add((false, response, stopwatch.elapsed, null));
hasEmitted = true;
}
},
onDone: () {
//handle cases where response is larger than a TCP packet and cuts mid-way
if (!hasEmitted && !controller.isClosed) {
final response = getResponseFromBytes(buffer.toString().codeUnits);
if (response.body.trim().isNotEmpty) {
final isStreaming = kStreamingResponseTypes.contains(contentType);
controller.add((isStreaming, response, stopwatch.elapsed, null));
}
}
cleanup();
},
onError: handleError,
);
return controller.stream;
} catch (e) {
await handleError(e);
return controller.stream;
}
return streamedResponse;
}

View File

@@ -41,4 +41,5 @@ Map<String, dynamic> responseModelJson = {
"formattedBody": formattedBody,
"bodyBytes": bodyBytes,
"time": 516000,
'sseOutput': null,
};