better_networking: streaming implementation (streamHttpRequest)

This commit is contained in:
Manas Hejmadi
2025-06-16 17:14:07 +05:30
parent ee75dd6fed
commit 80d2ca69d7
2 changed files with 153 additions and 11 deletions

View File

@@ -74,8 +74,9 @@ Future<(HttpResponse?, Duration?, String?)> sendHttpRequest(
);
}
}
http.StreamedResponse multiPartResponse =
await client.send(multiPartRequest);
http.StreamedResponse multiPartResponse = await client.send(
multiPartRequest,
);
stopwatch.stop();
http.Response convertedMultiPartResponse =
@@ -119,11 +120,7 @@ Future<(HttpResponse?, Duration?, String?)> sendHttpRequest(
}
}
}
response = await client.post(
requestUrl,
headers: headers,
body: body,
);
response = await client.post(requestUrl, headers: headers, body: body);
}
stopwatch.stop();
return (response, stopwatch.elapsed, null);
@@ -153,17 +150,145 @@ http.Request prepareHttpRequest({
}) {
var request = http.Request(method, url);
if (headers.getValueContentType() != null) {
request.headers[HttpHeaders.contentTypeHeader] =
headers.getValueContentType()!;
request.headers[HttpHeaders.contentTypeHeader] = headers
.getValueContentType()!;
if (!overrideContentType) {
headers.removeKeyContentType();
}
}
if (body != null) {
request.body = body;
headers[HttpHeaders.contentLengthHeader] =
request.bodyBytes.length.toString();
headers[HttpHeaders.contentLengthHeader] = request.bodyBytes.length
.toString();
}
request.headers.addAll(headers);
return request;
}
Future<Stream<(String?, Duration?, String?)?>> streamHttpRequest(
String requestId,
APIType apiType,
HttpRequestModel requestModel, {
SupportedUriSchemes defaultUriScheme = kDefaultUriScheme,
bool noSSL = false,
}) async {
final controller = StreamController<(String?, Duration?, String?)?>();
StreamSubscription<String?>? subscription;
final stopwatch = Stopwatch()..start();
_cleanup() async {
stopwatch.stop();
httpClientManager.closeClient(requestId);
await Future.microtask(() {});
controller.close();
}
Future<void> _handleError(dynamic error) async {
await Future.microtask(() {});
if (httpClientManager.wasRequestCancelled(requestId)) {
controller.add((null, null, kMsgRequestCancelled));
httpClientManager.removeCancelledRequest(requestId);
} else {
controller.add((null, null, error.toString()));
}
await _cleanup();
}
controller.onCancel = () async {
await subscription?.cancel();
httpClientManager.cancelRequest(requestId);
};
if (httpClientManager.wasRequestCancelled(requestId)) {
controller.add((null, null, kMsgRequestCancelled));
httpClientManager.removeCancelledRequest(requestId);
controller.close();
return controller.stream;
}
final client = httpClientManager.createClient(requestId, noSSL: noSSL);
final (uri, uriError) = getValidRequestUri(
requestModel.url,
requestModel.enabledParams,
defaultUriScheme: defaultUriScheme,
);
if (uri == null) {
await _handleError(uriError ?? 'Invalid URL');
return controller.stream;
}
final headers = requestModel.enabledHeadersMap;
final hasBody = kMethodsWithBody.contains(requestModel.method);
final isMultipart = requestModel.bodyContentType == ContentType.formdata;
try {
//HANDLE MULTI-PART
if (apiType == APIType.rest && isMultipart && hasBody) {
final multipart = http.MultipartRequest(
requestModel.method.name.toUpperCase(),
uri,
)..headers.addAll(headers);
for (final data in requestModel.formDataList) {
if (data.type == FormDataType.text) {
multipart.fields[data.name] = data.value;
} else {
multipart.files.add(
await http.MultipartFile.fromPath(data.name, data.value),
);
}
}
final streamedResponse = await client.send(multipart);
final stream = streamTextResponse(streamedResponse);
subscription = stream.listen(
(data) => controller.add((data, stopwatch.elapsed, null)),
onDone: () => _cleanup(),
onError: _handleError,
);
return controller.stream;
}
String? body;
bool overrideContentType = false;
if (hasBody && requestModel.body?.isNotEmpty == true) {
body = requestModel.body;
if (!requestModel.hasContentTypeHeader) {
headers[HttpHeaders.contentTypeHeader] =
requestModel.bodyContentType.header;
} else {
overrideContentType = true;
}
}
final request = prepareHttpRequest(
url: uri,
method: requestModel.method.name.toUpperCase(),
headers: headers,
body: body,
overrideContentType: overrideContentType,
);
final streamedResponse = await client.send(request);
final stream = streamTextResponse(streamedResponse);
subscription = stream.listen(
(data) {
if (!controller.isClosed) {
controller.add((data, stopwatch.elapsed, null));
}
},
onDone: () => _cleanup(),
onError: _handleError,
);
return controller.stream;
} catch (e) {
await _handleError(e);
return controller.stream;
}
}

View File

@@ -50,3 +50,20 @@ Future<http.Response> convertStreamedResponse(
return response;
}
Stream<String?> streamTextResponse(
http.StreamedResponse streamedResponse,
) async* {
try {
if (streamedResponse.statusCode != 200) {
final errorText = await streamedResponse.stream.bytesToString();
throw Exception('${streamedResponse.statusCode}\n$errorText');
}
final utf8Stream = streamedResponse.stream.transform(utf8.decoder);
await for (final chunk in utf8Stream) {
yield chunk;
}
} catch (e) {
rethrow;
}
}