diff --git a/packages/better_networking/lib/services/http_service.dart b/packages/better_networking/lib/services/http_service.dart index 1b05976a..25fcbcd9 100644 --- a/packages/better_networking/lib/services/http_service.dart +++ b/packages/better_networking/lib/services/http_service.dart @@ -74,8 +74,9 @@ Future<(HttpResponse?, Duration?, String?)> sendHttpRequest( ); } } - http.StreamedResponse multiPartResponse = - await client.send(multiPartRequest); + http.StreamedResponse multiPartResponse = await client.send( + multiPartRequest, + ); stopwatch.stop(); http.Response convertedMultiPartResponse = @@ -119,11 +120,7 @@ Future<(HttpResponse?, Duration?, String?)> sendHttpRequest( } } } - response = await client.post( - requestUrl, - headers: headers, - body: body, - ); + response = await client.post(requestUrl, headers: headers, body: body); } stopwatch.stop(); return (response, stopwatch.elapsed, null); @@ -153,17 +150,145 @@ http.Request prepareHttpRequest({ }) { var request = http.Request(method, url); if (headers.getValueContentType() != null) { - request.headers[HttpHeaders.contentTypeHeader] = - headers.getValueContentType()!; + request.headers[HttpHeaders.contentTypeHeader] = headers + .getValueContentType()!; if (!overrideContentType) { headers.removeKeyContentType(); } } if (body != null) { request.body = body; - headers[HttpHeaders.contentLengthHeader] = - request.bodyBytes.length.toString(); + headers[HttpHeaders.contentLengthHeader] = request.bodyBytes.length + .toString(); } request.headers.addAll(headers); return request; } + +Future> streamHttpRequest( + String requestId, + APIType apiType, + HttpRequestModel requestModel, { + SupportedUriSchemes defaultUriScheme = kDefaultUriScheme, + bool noSSL = false, +}) async { + final controller = StreamController<(String?, Duration?, String?)?>(); + StreamSubscription? subscription; + final stopwatch = Stopwatch()..start(); + + _cleanup() async { + stopwatch.stop(); + httpClientManager.closeClient(requestId); + await Future.microtask(() {}); + controller.close(); + } + + Future _handleError(dynamic error) async { + await Future.microtask(() {}); + if (httpClientManager.wasRequestCancelled(requestId)) { + controller.add((null, null, kMsgRequestCancelled)); + httpClientManager.removeCancelledRequest(requestId); + } else { + controller.add((null, null, error.toString())); + } + await _cleanup(); + } + + controller.onCancel = () async { + await subscription?.cancel(); + httpClientManager.cancelRequest(requestId); + }; + + if (httpClientManager.wasRequestCancelled(requestId)) { + controller.add((null, null, kMsgRequestCancelled)); + httpClientManager.removeCancelledRequest(requestId); + controller.close(); + return controller.stream; + } + + final client = httpClientManager.createClient(requestId, noSSL: noSSL); + final (uri, uriError) = getValidRequestUri( + requestModel.url, + requestModel.enabledParams, + defaultUriScheme: defaultUriScheme, + ); + + if (uri == null) { + await _handleError(uriError ?? 'Invalid URL'); + return controller.stream; + } + + final headers = requestModel.enabledHeadersMap; + final hasBody = kMethodsWithBody.contains(requestModel.method); + final isMultipart = requestModel.bodyContentType == ContentType.formdata; + + try { + //HANDLE MULTI-PART + if (apiType == APIType.rest && isMultipart && hasBody) { + final multipart = http.MultipartRequest( + requestModel.method.name.toUpperCase(), + uri, + )..headers.addAll(headers); + + for (final data in requestModel.formDataList) { + if (data.type == FormDataType.text) { + multipart.fields[data.name] = data.value; + } else { + multipart.files.add( + await http.MultipartFile.fromPath(data.name, data.value), + ); + } + } + + final streamedResponse = await client.send(multipart); + final stream = streamTextResponse(streamedResponse); + + subscription = stream.listen( + (data) => controller.add((data, stopwatch.elapsed, null)), + onDone: () => _cleanup(), + onError: _handleError, + ); + + return controller.stream; + } + + String? body; + bool overrideContentType = false; + + if (hasBody && requestModel.body?.isNotEmpty == true) { + body = requestModel.body; + if (!requestModel.hasContentTypeHeader) { + headers[HttpHeaders.contentTypeHeader] = + requestModel.bodyContentType.header; + } else { + overrideContentType = true; + } + } + + final request = prepareHttpRequest( + url: uri, + method: requestModel.method.name.toUpperCase(), + headers: headers, + body: body, + overrideContentType: overrideContentType, + ); + + final streamedResponse = await client.send(request); + final stream = streamTextResponse(streamedResponse); + + subscription = stream.listen( + (data) { + if (!controller.isClosed) { + controller.add((data, stopwatch.elapsed, null)); + } + }, + onDone: () => _cleanup(), + onError: _handleError, + ); + + return controller.stream; + } catch (e) { + await _handleError(e); + return controller.stream; + } +} diff --git a/packages/better_networking/lib/utils/http_response_utils.dart b/packages/better_networking/lib/utils/http_response_utils.dart index c5ebfda1..ac4a143a 100644 --- a/packages/better_networking/lib/utils/http_response_utils.dart +++ b/packages/better_networking/lib/utils/http_response_utils.dart @@ -50,3 +50,20 @@ Future convertStreamedResponse( return response; } + +Stream streamTextResponse( + http.StreamedResponse streamedResponse, +) async* { + try { + if (streamedResponse.statusCode != 200) { + final errorText = await streamedResponse.stream.bytesToString(); + throw Exception('${streamedResponse.statusCode}\n$errorText'); + } + final utf8Stream = streamedResponse.stream.transform(utf8.decoder); + await for (final chunk in utf8Stream) { + yield chunk; + } + } catch (e) { + rethrow; + } +}