streamHttpRequest: Fixed TCP Length Clipping issue & Refactored Code

This commit is contained in:
Manas Hejmadi
2025-07-03 02:27:16 +05:30
parent b3169fe775
commit 4207974b0e
2 changed files with 74 additions and 38 deletions

View File

@@ -323,26 +323,18 @@ class CollectionStateNotifier
HttpResponseModel? respModel; HttpResponseModel? respModel;
HistoryRequestModel? historyM; HistoryRequestModel? historyM;
RequestModel newRequestModel = requestModel; RequestModel newRequestModel = requestModel;
final completer = Completer<(Response?, Duration?, String?)>();
bool? isTextStream; bool? isTextStream;
final completer = Completer<(Response?, Duration?, String?)>();
StreamSubscription? sub; StreamSubscription? sub;
sub = stream.listen((d) async { sub = stream.listen((d) async {
if (d == null) return; if (d == null) return;
final contentType = d.$1?.headers['content-type']?.toLowerCase(); isTextStream = d.$1;
final response = d.$2;
if (isTextStream == null) { final duration = d.$3;
if (kStreamingResponseTypes.contains(contentType)) { final errorMessage = d.$4;
isTextStream = true;
} else {
isTextStream = false;
}
}
final response = d.$1;
final duration = d.$2;
final errorMessage = d.$3;
if (isTextStream == false) { if (isTextStream == false) {
if (!completer.isCompleted) { if (!completer.isCompleted) {

View File

@@ -165,16 +165,21 @@ http.Request prepareHttpRequest({
return request; return request;
} }
Future<Stream<(HttpResponse? resp, Duration? dur, String? err)?>> typedef HttpStreamOutput = (
streamHttpRequest( bool? streamOutput,
HttpResponse? resp,
Duration? dur,
String? err,
)?;
Future<Stream<HttpStreamOutput>> streamHttpRequest(
String requestId, String requestId,
APIType apiType, APIType apiType,
HttpRequestModel requestModel, { HttpRequestModel requestModel, {
SupportedUriSchemes defaultUriScheme = kDefaultUriScheme, SupportedUriSchemes defaultUriScheme = kDefaultUriScheme,
bool noSSL = false, bool noSSL = false,
}) async { }) async {
final controller = final controller = StreamController<HttpStreamOutput>();
StreamController<(HttpResponse? resp, Duration? dur, String? err)?>();
StreamSubscription<String?>? subscription; StreamSubscription<String?>? subscription;
final stopwatch = Stopwatch()..start(); final stopwatch = Stopwatch()..start();
@@ -190,11 +195,11 @@ streamHttpRequest(
await Future.microtask(() {}); await Future.microtask(() {});
if (httpClientManager.wasRequestCancelled(requestId)) { if (httpClientManager.wasRequestCancelled(requestId)) {
if (!controller.isClosed) { if (!controller.isClosed) {
controller.add((null, null, kMsgRequestCancelled)); controller.add((null, null, null, kMsgRequestCancelled));
} }
httpClientManager.removeCancelledRequest(requestId); httpClientManager.removeCancelledRequest(requestId);
} else { } else {
controller.add((null, null, error.toString())); controller.add((null, null, null, error.toString()));
} }
await cleanup(); await cleanup();
} }
@@ -206,7 +211,7 @@ streamHttpRequest(
if (httpClientManager.wasRequestCancelled(requestId)) { if (httpClientManager.wasRequestCancelled(requestId)) {
if (!controller.isClosed) { if (!controller.isClosed) {
controller.add((null, null, kMsgRequestCancelled)); controller.add((null, null, null, kMsgRequestCancelled));
} }
httpClientManager.removeCancelledRequest(requestId); httpClientManager.removeCancelledRequest(requestId);
controller.close(); controller.close();
@@ -232,6 +237,7 @@ streamHttpRequest(
http.StreamedResponse streamedResponse; http.StreamedResponse streamedResponse;
try { try {
//----------------- Request Creation ---------------------
//Handling HTTP Multipart Requests //Handling HTTP Multipart Requests
if (apiType == APIType.rest && isMultipart && hasBody) { if (apiType == APIType.rest && isMultipart && hasBody) {
final multipart = http.MultipartRequest( final multipart = http.MultipartRequest(
@@ -267,6 +273,7 @@ streamHttpRequest(
..body = body ?? ''; ..body = body ?? '';
streamedResponse = await client.send(request); streamedResponse = await client.send(request);
} else { } else {
//Handling regular REST Requests
String? body; String? body;
bool overrideContentType = false; bool overrideContentType = false;
if (hasBody && requestModel.body?.isNotEmpty == true) { if (hasBody && requestModel.body?.isNotEmpty == true) {
@@ -288,28 +295,65 @@ streamHttpRequest(
streamedResponse = await client.send(request); streamedResponse = await client.send(request);
} }
//----------------- Response Handling ---------------------
final Stream<String?> outputStream = streamTextResponse(streamedResponse); final Stream<String?> outputStream = streamTextResponse(streamedResponse);
HttpResponse getResponseFromBytes(List<int> bytes) {
return HttpResponse.bytes(
bytes,
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
);
}
final buffer = StringBuffer();
final contentType =
streamedResponse.headers['content-type']?.toString() ?? '';
final contentLength =
int.tryParse(streamedResponse.headers['content-length'] ?? '') ?? -1;
int receivedBytes = 0;
bool hasEmitted = false;
subscription = outputStream.listen( subscription = outputStream.listen(
(data) { (chunk) {
HttpResponse? resp; if (chunk == null || controller.isClosed) return;
if (data != null) {
resp = HttpResponse.bytes( final isStreaming = kStreamingResponseTypes.contains(contentType);
data.codeUnits,
streamedResponse.statusCode, if (isStreaming) {
request: streamedResponse.request, //For Streaming responses, output every response
headers: streamedResponse.headers, final response = getResponseFromBytes(chunk.codeUnits);
isRedirect: streamedResponse.isRedirect, controller.add((true, response, stopwatch.elapsed, null));
persistentConnection: streamedResponse.persistentConnection, return;
reasonPhrase: streamedResponse.reasonPhrase, }
);
if (!controller.isClosed) { //For non Streaming events, add output to buffer
//if it is partial response chunk, complete it here only and then send receivedBytes += chunk.codeUnits.length;
controller.add((resp, stopwatch.elapsed, null)); buffer.write(chunk);
}
if (!hasEmitted &&
contentLength > 0 &&
receivedBytes >= contentLength &&
!controller.isClosed) {
final response = getResponseFromBytes(buffer.toString().codeUnits);
controller.add((false, response, stopwatch.elapsed, null));
hasEmitted = true;
} }
}, },
onDone: () => cleanup(), onDone: () {
//handle cases where response is larger than a TCP packet and cuts mid-way
if (!hasEmitted && !controller.isClosed) {
final response = getResponseFromBytes(buffer.toString().codeUnits);
if (response.body.trim().isEmpty) return;
final isStreaming = kStreamingResponseTypes.contains(contentType);
controller.add((isStreaming, response, stopwatch.elapsed, null));
}
cleanup();
},
onError: handleError, onError: handleError,
); );
return controller.stream; return controller.stream;