streamHttpRequest: replaced string buffering with chunk_expansion

This commit is contained in:
Manas Hejmadi
2025-07-04 02:31:42 +05:30
parent 2ab6de6a62
commit 6bcc855d06
3 changed files with 36 additions and 103 deletions

View File

@@ -27,8 +27,7 @@ Future<(HttpResponse?, Duration?, String?)> sendHttpRequest(
defaultUriScheme: defaultUriScheme,
noSSL: noSSL,
);
final output = await stream.last;
final output = await stream.first;
return (output?.$2, output?.$3, output?.$4);
// if (httpClientManager.wasRequestCancelled(requestId)) {
@@ -194,7 +193,7 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
StreamSubscription<List<int>?>? subscription;
final stopwatch = Stopwatch()..start();
cleanup() async {
Future<void> cleanup() async {
stopwatch.stop();
await subscription?.cancel();
httpClientManager.closeClient(requestId);
@@ -202,17 +201,22 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
controller.close();
}
Future<void> handleError(dynamic error) async {
Future<void> addCancelledMessage() async {
if (!controller.isClosed) {
controller.add((null, null, null, kMsgRequestCancelled));
}
httpClientManager.removeCancelledRequest(requestId);
await cleanup();
}
Future<void> addErrorMessage(dynamic error) async {
await Future.microtask(() {});
if (httpClientManager.wasRequestCancelled(requestId)) {
if (!controller.isClosed) {
controller.add((null, null, null, kMsgRequestCancelled));
}
httpClientManager.removeCancelledRequest(requestId);
await addCancelledMessage();
} else {
controller.add((null, null, null, error.toString()));
await cleanup();
}
await cleanup();
}
controller.onCancel = () async {
@@ -221,11 +225,7 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
};
if (httpClientManager.wasRequestCancelled(requestId)) {
if (!controller.isClosed) {
controller.add((null, null, null, kMsgRequestCancelled));
}
httpClientManager.removeCancelledRequest(requestId);
controller.close();
await addCancelledMessage();
return controller.stream;
}
@@ -237,7 +237,7 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
);
if (uri == null) {
await handleError(uriError ?? 'Invalid URL');
await addErrorMessage(uriError ?? 'Invalid URL');
return controller.stream;
}
@@ -248,7 +248,6 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
requestModel: requestModel,
apiType: apiType,
);
//----------------- Response Handling ---------------------
HttpResponse getResponseFromBytes(List<int> bytes) {
return HttpResponse.bytes(
@@ -262,54 +261,46 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
);
}
final buffer = StringBuffer();
final contentType =
streamedResponse.headers['content-type']?.toString() ?? '';
final contentLength =
int.tryParse(streamedResponse.headers['content-length'] ?? '') ?? -1;
int receivedBytes = 0;
final chunkList = <List<int>>[];
bool hasEmitted = false;
subscription = streamedResponse.stream.listen(
(bytes) {
(bytes) async {
if (controller.isClosed) return;
if (httpClientManager.wasRequestCancelled(requestId)) {
return await addCancelledMessage();
}
final isStreaming = kStreamingResponseTypes.contains(contentType);
if (isStreaming) {
//For Streaming responses, output every response
final response = getResponseFromBytes(bytes);
controller.add((true, response, stopwatch.elapsed, null));
return;
}
//For non Streaming events, add output to buffer
receivedBytes += bytes.length;
buffer.write(decodeBytes(bytes, contentType));
if (!hasEmitted &&
contentLength > 0 &&
receivedBytes >= contentLength &&
!controller.isClosed) {
final response = getResponseFromBytes(buffer.toString().codeUnits);
controller.add((false, response, stopwatch.elapsed, null));
hasEmitted = true;
} else {
chunkList.add(bytes);
}
},
onDone: () {
//handle cases where response is larger than a TCP packet and cuts mid-way
onDone: () async {
if (httpClientManager.wasRequestCancelled(requestId)) {
return await addCancelledMessage();
}
if (!hasEmitted && !controller.isClosed) {
final response = getResponseFromBytes(buffer.toString().codeUnits);
final allBytes = chunkList.expand((x) => x).toList();
final response = getResponseFromBytes(allBytes);
final isStreaming = kStreamingResponseTypes.contains(contentType);
controller.add((isStreaming, response, stopwatch.elapsed, null));
}
cleanup();
await cleanup();
},
onError: handleError,
onError: addErrorMessage,
);
return controller.stream;
} catch (e) {
await handleError(e);
await addErrorMessage(e);
return controller.stream;
}
}
@@ -328,7 +319,7 @@ Future<http.StreamedResponse> makeStreamedRequest({
//----------------- Request Creation ---------------------
//Handling HTTP Multipart Requests
if (requestModel == APIType.rest && isMultipart && hasBody) {
if (apiType == APIType.rest && isMultipart && hasBody) {
final multipart = http.MultipartRequest(
requestModel.method.name.toUpperCase(),
uri,