From 6bcc855d06365fc809fa4fff31cc2409b0419b31 Mon Sep 17 00:00:00 2001 From: Manas Hejmadi Date: Fri, 4 Jul 2025 02:31:42 +0530 Subject: [PATCH] streamHttpRequest: replaced string buffering with chunk_expansion --- .../lib/models/http_response_model.dart | 8 +- .../lib/services/http_service.dart | 75 ++++++++----------- .../lib/utils/http_response_utils.dart | 56 -------------- 3 files changed, 36 insertions(+), 103 deletions(-) diff --git a/packages/better_networking/lib/models/http_response_model.dart b/packages/better_networking/lib/models/http_response_model.dart index 18c835ab..f8b779d7 100644 --- a/packages/better_networking/lib/models/http_response_model.dart +++ b/packages/better_networking/lib/models/http_response_model.dart @@ -68,11 +68,9 @@ class HttpResponseModel with _$HttpResponseModel { }, response.headers); MediaType? mediaType = getMediaTypeFromHeaders(responseHeaders); - //TODO: Review Effectiveness - final body = decodeBytes( - response.bodyBytes, - response.headers['content-type']!, - ); + final body = (mediaType?.subtype == kSubTypeJson) + ? utf8.decode(response.bodyBytes) + : response.body; return HttpResponseModel( statusCode: response.statusCode, diff --git a/packages/better_networking/lib/services/http_service.dart b/packages/better_networking/lib/services/http_service.dart index aa17b775..1f5e50c9 100644 --- a/packages/better_networking/lib/services/http_service.dart +++ b/packages/better_networking/lib/services/http_service.dart @@ -27,8 +27,7 @@ Future<(HttpResponse?, Duration?, String?)> sendHttpRequest( defaultUriScheme: defaultUriScheme, noSSL: noSSL, ); - final output = await stream.last; - + final output = await stream.first; return (output?.$2, output?.$3, output?.$4); // if (httpClientManager.wasRequestCancelled(requestId)) { @@ -194,7 +193,7 @@ Future> streamHttpRequest( StreamSubscription?>? subscription; final stopwatch = Stopwatch()..start(); - cleanup() async { + Future cleanup() async { stopwatch.stop(); await subscription?.cancel(); httpClientManager.closeClient(requestId); @@ -202,17 +201,22 @@ Future> streamHttpRequest( controller.close(); } - Future handleError(dynamic error) async { + Future addCancelledMessage() async { + if (!controller.isClosed) { + controller.add((null, null, null, kMsgRequestCancelled)); + } + httpClientManager.removeCancelledRequest(requestId); + await cleanup(); + } + + Future addErrorMessage(dynamic error) async { await Future.microtask(() {}); if (httpClientManager.wasRequestCancelled(requestId)) { - if (!controller.isClosed) { - controller.add((null, null, null, kMsgRequestCancelled)); - } - httpClientManager.removeCancelledRequest(requestId); + await addCancelledMessage(); } else { controller.add((null, null, null, error.toString())); + await cleanup(); } - await cleanup(); } controller.onCancel = () async { @@ -221,11 +225,7 @@ Future> streamHttpRequest( }; if (httpClientManager.wasRequestCancelled(requestId)) { - if (!controller.isClosed) { - controller.add((null, null, null, kMsgRequestCancelled)); - } - httpClientManager.removeCancelledRequest(requestId); - controller.close(); + await addCancelledMessage(); return controller.stream; } @@ -237,7 +237,7 @@ Future> streamHttpRequest( ); if (uri == null) { - await handleError(uriError ?? 'Invalid URL'); + await addErrorMessage(uriError ?? 'Invalid URL'); return controller.stream; } @@ -248,7 +248,6 @@ Future> streamHttpRequest( requestModel: requestModel, apiType: apiType, ); - //----------------- Response Handling --------------------- HttpResponse getResponseFromBytes(List bytes) { return HttpResponse.bytes( @@ -262,54 +261,46 @@ Future> streamHttpRequest( ); } - final buffer = StringBuffer(); final contentType = streamedResponse.headers['content-type']?.toString() ?? ''; - final contentLength = - int.tryParse(streamedResponse.headers['content-length'] ?? '') ?? -1; - int receivedBytes = 0; + final chunkList = >[]; bool hasEmitted = false; subscription = streamedResponse.stream.listen( - (bytes) { + (bytes) async { if (controller.isClosed) return; + if (httpClientManager.wasRequestCancelled(requestId)) { + return await addCancelledMessage(); + } final isStreaming = kStreamingResponseTypes.contains(contentType); if (isStreaming) { - //For Streaming responses, output every response final response = getResponseFromBytes(bytes); controller.add((true, response, stopwatch.elapsed, null)); - return; - } - - //For non Streaming events, add output to buffer - receivedBytes += bytes.length; - buffer.write(decodeBytes(bytes, contentType)); - - if (!hasEmitted && - contentLength > 0 && - receivedBytes >= contentLength && - !controller.isClosed) { - final response = getResponseFromBytes(buffer.toString().codeUnits); - controller.add((false, response, stopwatch.elapsed, null)); hasEmitted = true; + } else { + chunkList.add(bytes); } }, - onDone: () { - //handle cases where response is larger than a TCP packet and cuts mid-way + onDone: () async { + if (httpClientManager.wasRequestCancelled(requestId)) { + return await addCancelledMessage(); + } if (!hasEmitted && !controller.isClosed) { - final response = getResponseFromBytes(buffer.toString().codeUnits); + final allBytes = chunkList.expand((x) => x).toList(); + final response = getResponseFromBytes(allBytes); final isStreaming = kStreamingResponseTypes.contains(contentType); controller.add((isStreaming, response, stopwatch.elapsed, null)); } - cleanup(); + await cleanup(); }, - onError: handleError, + onError: addErrorMessage, ); + return controller.stream; } catch (e) { - await handleError(e); + await addErrorMessage(e); return controller.stream; } } @@ -328,7 +319,7 @@ Future makeStreamedRequest({ //----------------- Request Creation --------------------- //Handling HTTP Multipart Requests - if (requestModel == APIType.rest && isMultipart && hasBody) { + if (apiType == APIType.rest && isMultipart && hasBody) { final multipart = http.MultipartRequest( requestModel.method.name.toUpperCase(), uri, diff --git a/packages/better_networking/lib/utils/http_response_utils.dart b/packages/better_networking/lib/utils/http_response_utils.dart index f697fc38..ba19802e 100644 --- a/packages/better_networking/lib/utils/http_response_utils.dart +++ b/packages/better_networking/lib/utils/http_response_utils.dart @@ -51,59 +51,3 @@ Future convertStreamedResponse( return response; } - -// Stream streamTextResponse( -// http.StreamedResponse streamedResponse, -// ) async* { -// try { -// if (streamedResponse.statusCode != 200) { -// final errorText = await streamedResponse.stream.bytesToString(); -// throw Exception('${streamedResponse.statusCode}\n$errorText'); -// } -// final utf8Stream = streamedResponse.stream.transform(utf8.decoder); -// await for (final chunk in utf8Stream) { -// yield chunk; -// } -// } catch (e) { -// rethrow; -// } -// } - -String getCharset(String contentType) { - final match = RegExp( - r'charset=([^\s;]+)', - caseSensitive: false, - ).firstMatch(contentType); - return match?.group(1)?.toLowerCase() ?? 'utf-8'; // default to utf-8 -} - -String decodeBytes(List bytes, String contentType) { - String _decodeUtf16(List bytes, Endian endianness) { - final byteData = ByteData.sublistView(Uint8List.fromList(bytes)); - final codeUnits = []; - for (int i = 0; i + 1 < byteData.lengthInBytes; i += 2) { - codeUnits.add(byteData.getUint16(i, endianness)); - } - return String.fromCharCodes(codeUnits); - } - - final cSet = getCharset(contentType); - switch (cSet) { - case 'utf-8': - case 'utf8': - return utf8.decode(bytes, allowMalformed: true); - case 'utf-16': - case 'utf-16le': - return _decodeUtf16(bytes, Endian.little); - case 'utf-16be': - return _decodeUtf16(bytes, Endian.big); - case 'iso-8859-1': - case 'latin1': - return latin1.decode(bytes); - case 'us-ascii': - case 'ascii': - return ascii.decode(bytes); - default: - return utf8.decode(bytes, allowMalformed: true); //UTF8 - } -}