From 4207974b0e7c07842421dd0a31aa3cb351eadff4 Mon Sep 17 00:00:00 2001 From: Manas Hejmadi Date: Thu, 3 Jul 2025 02:27:16 +0530 Subject: [PATCH] streamHttpRequest: Fixed TCP Length Clipping issue & Refactored Code --- lib/providers/collection_providers.dart | 20 ++-- .../lib/services/http_service.dart | 92 ++++++++++++++----- 2 files changed, 74 insertions(+), 38 deletions(-) diff --git a/lib/providers/collection_providers.dart b/lib/providers/collection_providers.dart index 6f3e4d6d..5da9c487 100644 --- a/lib/providers/collection_providers.dart +++ b/lib/providers/collection_providers.dart @@ -323,26 +323,18 @@ class CollectionStateNotifier HttpResponseModel? respModel; HistoryRequestModel? historyM; RequestModel newRequestModel = requestModel; - final completer = Completer<(Response?, Duration?, String?)>(); bool? isTextStream; + final completer = Completer<(Response?, Duration?, String?)>(); + StreamSubscription? sub; sub = stream.listen((d) async { if (d == null) return; - final contentType = d.$1?.headers['content-type']?.toLowerCase(); - - if (isTextStream == null) { - if (kStreamingResponseTypes.contains(contentType)) { - isTextStream = true; - } else { - isTextStream = false; - } - } - - final response = d.$1; - final duration = d.$2; - final errorMessage = d.$3; + isTextStream = d.$1; + final response = d.$2; + final duration = d.$3; + final errorMessage = d.$4; if (isTextStream == false) { if (!completer.isCompleted) { diff --git a/packages/better_networking/lib/services/http_service.dart b/packages/better_networking/lib/services/http_service.dart index 0cd68a23..a883087e 100644 --- a/packages/better_networking/lib/services/http_service.dart +++ b/packages/better_networking/lib/services/http_service.dart @@ -165,16 +165,21 @@ http.Request prepareHttpRequest({ return request; } -Future> -streamHttpRequest( +typedef HttpStreamOutput = ( + bool? streamOutput, + HttpResponse? resp, + Duration? dur, + String? err, +)?; + +Future> streamHttpRequest( String requestId, APIType apiType, HttpRequestModel requestModel, { SupportedUriSchemes defaultUriScheme = kDefaultUriScheme, bool noSSL = false, }) async { - final controller = - StreamController<(HttpResponse? resp, Duration? dur, String? err)?>(); + final controller = StreamController(); StreamSubscription? subscription; final stopwatch = Stopwatch()..start(); @@ -190,11 +195,11 @@ streamHttpRequest( await Future.microtask(() {}); if (httpClientManager.wasRequestCancelled(requestId)) { if (!controller.isClosed) { - controller.add((null, null, kMsgRequestCancelled)); + controller.add((null, null, null, kMsgRequestCancelled)); } httpClientManager.removeCancelledRequest(requestId); } else { - controller.add((null, null, error.toString())); + controller.add((null, null, null, error.toString())); } await cleanup(); } @@ -206,7 +211,7 @@ streamHttpRequest( if (httpClientManager.wasRequestCancelled(requestId)) { if (!controller.isClosed) { - controller.add((null, null, kMsgRequestCancelled)); + controller.add((null, null, null, kMsgRequestCancelled)); } httpClientManager.removeCancelledRequest(requestId); controller.close(); @@ -232,6 +237,7 @@ streamHttpRequest( http.StreamedResponse streamedResponse; try { + //----------------- Request Creation --------------------- //Handling HTTP Multipart Requests if (apiType == APIType.rest && isMultipart && hasBody) { final multipart = http.MultipartRequest( @@ -267,6 +273,7 @@ streamHttpRequest( ..body = body ?? ''; streamedResponse = await client.send(request); } else { + //Handling regular REST Requests String? body; bool overrideContentType = false; if (hasBody && requestModel.body?.isNotEmpty == true) { @@ -288,28 +295,65 @@ streamHttpRequest( streamedResponse = await client.send(request); } + //----------------- Response Handling --------------------- final Stream outputStream = streamTextResponse(streamedResponse); + HttpResponse getResponseFromBytes(List bytes) { + return HttpResponse.bytes( + bytes, + streamedResponse.statusCode, + request: streamedResponse.request, + headers: streamedResponse.headers, + isRedirect: streamedResponse.isRedirect, + persistentConnection: streamedResponse.persistentConnection, + reasonPhrase: streamedResponse.reasonPhrase, + ); + } + + final buffer = StringBuffer(); + final contentType = + streamedResponse.headers['content-type']?.toString() ?? ''; + final contentLength = + int.tryParse(streamedResponse.headers['content-length'] ?? '') ?? -1; + int receivedBytes = 0; + bool hasEmitted = false; + subscription = outputStream.listen( - (data) { - HttpResponse? resp; - if (data != null) { - resp = HttpResponse.bytes( - data.codeUnits, - streamedResponse.statusCode, - request: streamedResponse.request, - headers: streamedResponse.headers, - isRedirect: streamedResponse.isRedirect, - persistentConnection: streamedResponse.persistentConnection, - reasonPhrase: streamedResponse.reasonPhrase, - ); - if (!controller.isClosed) { - //if it is partial response chunk, complete it here only and then send - controller.add((resp, stopwatch.elapsed, null)); - } + (chunk) { + if (chunk == null || controller.isClosed) return; + + final isStreaming = kStreamingResponseTypes.contains(contentType); + + if (isStreaming) { + //For Streaming responses, output every response + final response = getResponseFromBytes(chunk.codeUnits); + controller.add((true, response, stopwatch.elapsed, null)); + return; + } + + //For non Streaming events, add output to buffer + receivedBytes += chunk.codeUnits.length; + buffer.write(chunk); + + if (!hasEmitted && + contentLength > 0 && + receivedBytes >= contentLength && + !controller.isClosed) { + final response = getResponseFromBytes(buffer.toString().codeUnits); + controller.add((false, response, stopwatch.elapsed, null)); + hasEmitted = true; } }, - onDone: () => cleanup(), + onDone: () { + //handle cases where response is larger than a TCP packet and cuts mid-way + if (!hasEmitted && !controller.isClosed) { + final response = getResponseFromBytes(buffer.toString().codeUnits); + if (response.body.trim().isEmpty) return; + final isStreaming = kStreamingResponseTypes.contains(contentType); + controller.add((isStreaming, response, stopwatch.elapsed, null)); + } + cleanup(); + }, onError: handleError, ); return controller.stream;