From aef57df466ce4f7449cc1f64605e9db0455ead6e Mon Sep 17 00:00:00 2001 From: Manas Hejmadi Date: Thu, 3 Jul 2025 01:01:07 +0530 Subject: [PATCH] streamHttpRequest: Major Restructuring & other review changes --- lib/providers/collection_providers.dart | 25 ++-- packages/better_networking/lib/consts.dart | 5 + .../lib/services/http_service.dart | 123 +++++++----------- 3 files changed, 69 insertions(+), 84 deletions(-) diff --git a/lib/providers/collection_providers.dart b/lib/providers/collection_providers.dart index a1806fdd..6f3e4d6d 100644 --- a/lib/providers/collection_providers.dart +++ b/lib/providers/collection_providers.dart @@ -324,22 +324,27 @@ class CollectionStateNotifier HistoryRequestModel? historyM; RequestModel newRequestModel = requestModel; final completer = Completer<(Response?, Duration?, String?)>(); - bool isTextStream = false; + bool? isTextStream; StreamSubscription? sub; sub = stream.listen((d) async { if (d == null) return; - final contentType = d.$1; - isTextStream = isTextStream || - contentType == 'text/event-stream' || - contentType == 'application/x-ndjson'; + final contentType = d.$1?.headers['content-type']?.toLowerCase(); - final response = d.$2; - final duration = d.$3; - final errorMessage = d.$4; + if (isTextStream == null) { + if (kStreamingResponseTypes.contains(contentType)) { + isTextStream = true; + } else { + isTextStream = false; + } + } - if (!isTextStream) { + final response = d.$1; + final duration = d.$2; + final errorMessage = d.$3; + + if (isTextStream == false) { if (!completer.isCompleted) { completer.complete((response, duration, errorMessage)); } @@ -403,7 +408,7 @@ class CollectionStateNotifier time: duration, ) .copyWith( - sseOutput: isTextStream ? [response.body] : [], + sseOutput: (isTextStream == true) ? [response.body] : [], ); newRequestModel = newRequestModel.copyWith( diff --git a/packages/better_networking/lib/consts.dart b/packages/better_networking/lib/consts.dart index 61fe1e50..2df65b03 100644 --- a/packages/better_networking/lib/consts.dart +++ b/packages/better_networking/lib/consts.dart @@ -22,6 +22,11 @@ enum HTTPVerb { final String abbr; } +List kStreamingResponseTypes = [ + 'text/event-stream', + 'application/x-ndjson', +]; + enum SupportedUriSchemes { https, http } final kSupportedUriSchemes = SupportedUriSchemes.values diff --git a/packages/better_networking/lib/services/http_service.dart b/packages/better_networking/lib/services/http_service.dart index b91eac00..2ae5bfa1 100644 --- a/packages/better_networking/lib/services/http_service.dart +++ b/packages/better_networking/lib/services/http_service.dart @@ -165,7 +165,7 @@ http.Request prepareHttpRequest({ return request; } -Future> +Future> streamHttpRequest( String requestId, APIType apiType, @@ -174,14 +174,13 @@ streamHttpRequest( bool noSSL = false, }) async { final controller = - StreamController< - (String? cT, HttpResponse? resp, Duration? dur, String? err)? - >(); + StreamController<(HttpResponse? resp, Duration? dur, String? err)?>(); StreamSubscription? subscription; final stopwatch = Stopwatch()..start(); cleanup() async { stopwatch.stop(); + await subscription?.cancel(); httpClientManager.closeClient(requestId); await Future.microtask(() {}); controller.close(); @@ -190,10 +189,12 @@ streamHttpRequest( Future handleError(dynamic error) async { await Future.microtask(() {}); if (httpClientManager.wasRequestCancelled(requestId)) { - controller.add((null, null, null, kMsgRequestCancelled)); + if (!controller.isClosed) { + controller.add((null, null, kMsgRequestCancelled)); + } httpClientManager.removeCancelledRequest(requestId); } else { - controller.add((null, null, null, error.toString())); + controller.add((null, null, error.toString())); } await cleanup(); } @@ -204,7 +205,9 @@ streamHttpRequest( }; if (httpClientManager.wasRequestCancelled(requestId)) { - controller.add((null, null, null, kMsgRequestCancelled)); + if (!controller.isClosed) { + controller.add((null, null, kMsgRequestCancelled)); + } httpClientManager.removeCancelledRequest(requestId); controller.close(); return controller.stream; @@ -226,6 +229,8 @@ streamHttpRequest( final hasBody = kMethodsWithBody.contains(requestModel.method); final isMultipart = requestModel.bodyContentType == ContentType.formdata; + http.StreamedResponse streamedResponse; + try { //HANDLE MULTI-PART if (apiType == APIType.rest && isMultipart && hasBody) { @@ -233,7 +238,6 @@ streamHttpRequest( requestModel.method.name.toUpperCase(), uri, )..headers.addAll(headers); - for (final data in requestModel.formDataList) { if (data.type == FormDataType.text) { multipart.fields[data.name] = data.value; @@ -243,83 +247,54 @@ streamHttpRequest( ); } } - - final streamedResponse = await client.send(multipart); - final stream = streamTextResponse(streamedResponse); - - subscription = stream.listen( - (data) => controller.add(( - streamedResponse.headers['content-type'].toString(), - data == null - ? null - : HttpResponse.bytes( - utf8.encode(data), - streamedResponse.statusCode, - request: streamedResponse.request, - headers: streamedResponse.headers, - isRedirect: streamedResponse.isRedirect, - persistentConnection: streamedResponse.persistentConnection, - reasonPhrase: streamedResponse.reasonPhrase, - ), - stopwatch.elapsed, - null, - )), - onDone: () => cleanup(), - onError: handleError, - ); - - return controller.stream; - } - - String? body; - bool overrideContentType = false; - - if (hasBody && requestModel.body?.isNotEmpty == true) { - body = requestModel.body; - if (!requestModel.hasContentTypeHeader) { - headers[HttpHeaders.contentTypeHeader] = - requestModel.bodyContentType.header; - } else { - overrideContentType = true; + streamedResponse = await client.send(multipart); + } else { + String? body; + bool overrideContentType = false; + if (hasBody && requestModel.body?.isNotEmpty == true) { + body = requestModel.body; + if (!requestModel.hasContentTypeHeader) { + headers[HttpHeaders.contentTypeHeader] = + requestModel.bodyContentType.header; + } else { + overrideContentType = true; + } } + final request = prepareHttpRequest( + url: uri, + method: requestModel.method.name.toUpperCase(), + headers: headers, + body: body, + overrideContentType: overrideContentType, + ); + streamedResponse = await client.send(request); } - final request = prepareHttpRequest( - url: uri, - method: requestModel.method.name.toUpperCase(), - headers: headers, - body: body, - overrideContentType: overrideContentType, - ); + final Stream outputStream = streamTextResponse(streamedResponse); - final streamedResponse = await client.send(request); - final stream = streamTextResponse(streamedResponse); - - subscription = stream.listen( + subscription = outputStream.listen( (data) { if (!controller.isClosed) { - controller.add(( - streamedResponse.headers['content-type'].toString(), - data == null - ? null - : HttpResponse.bytes( - utf8.encode(data), - streamedResponse.statusCode, - request: streamedResponse.request, - headers: streamedResponse.headers, - isRedirect: streamedResponse.isRedirect, - persistentConnection: streamedResponse.persistentConnection, - reasonPhrase: streamedResponse.reasonPhrase, - ), - stopwatch.elapsed, - null, - )); + HttpResponse? resp; + if (data != null) { + resp = HttpResponse.bytes( + data.codeUnits, + streamedResponse.statusCode, + request: streamedResponse.request, + headers: streamedResponse.headers, + isRedirect: streamedResponse.isRedirect, + persistentConnection: streamedResponse.persistentConnection, + reasonPhrase: streamedResponse.reasonPhrase, + ); + } + if (!controller.isClosed) { + controller.add((resp, stopwatch.elapsed, null)); + } } }, onDone: () => cleanup(), onError: handleError, ); - return controller.stream; } catch (e) { await handleError(e);