streamHttpRequest: Major Restructuring & other review changes

This commit is contained in:
Manas Hejmadi
2025-07-03 01:01:07 +05:30
parent 637a018c76
commit aef57df466
3 changed files with 69 additions and 84 deletions

View File

@@ -324,22 +324,27 @@ class CollectionStateNotifier
HistoryRequestModel? historyM;
RequestModel newRequestModel = requestModel;
final completer = Completer<(Response?, Duration?, String?)>();
bool isTextStream = false;
bool? isTextStream;
StreamSubscription? sub;
sub = stream.listen((d) async {
if (d == null) return;
final contentType = d.$1;
isTextStream = isTextStream ||
contentType == 'text/event-stream' ||
contentType == 'application/x-ndjson';
final contentType = d.$1?.headers['content-type']?.toLowerCase();
final response = d.$2;
final duration = d.$3;
final errorMessage = d.$4;
if (isTextStream == null) {
if (kStreamingResponseTypes.contains(contentType)) {
isTextStream = true;
} else {
isTextStream = false;
}
}
if (!isTextStream) {
final response = d.$1;
final duration = d.$2;
final errorMessage = d.$3;
if (isTextStream == false) {
if (!completer.isCompleted) {
completer.complete((response, duration, errorMessage));
}
@@ -403,7 +408,7 @@ class CollectionStateNotifier
time: duration,
)
.copyWith(
sseOutput: isTextStream ? [response.body] : [],
sseOutput: (isTextStream == true) ? [response.body] : [],
);
newRequestModel = newRequestModel.copyWith(

View File

@@ -22,6 +22,11 @@ enum HTTPVerb {
final String abbr;
}
List<String> kStreamingResponseTypes = [
'text/event-stream',
'application/x-ndjson',
];
enum SupportedUriSchemes { https, http }
final kSupportedUriSchemes = SupportedUriSchemes.values

View File

@@ -165,7 +165,7 @@ http.Request prepareHttpRequest({
return request;
}
Future<Stream<(String? cT, HttpResponse? resp, Duration? dur, String? err)?>>
Future<Stream<(HttpResponse? resp, Duration? dur, String? err)?>>
streamHttpRequest(
String requestId,
APIType apiType,
@@ -174,14 +174,13 @@ streamHttpRequest(
bool noSSL = false,
}) async {
final controller =
StreamController<
(String? cT, HttpResponse? resp, Duration? dur, String? err)?
>();
StreamController<(HttpResponse? resp, Duration? dur, String? err)?>();
StreamSubscription<String?>? subscription;
final stopwatch = Stopwatch()..start();
cleanup() async {
stopwatch.stop();
await subscription?.cancel();
httpClientManager.closeClient(requestId);
await Future.microtask(() {});
controller.close();
@@ -190,10 +189,12 @@ streamHttpRequest(
Future<void> handleError(dynamic error) async {
await Future.microtask(() {});
if (httpClientManager.wasRequestCancelled(requestId)) {
controller.add((null, null, null, kMsgRequestCancelled));
if (!controller.isClosed) {
controller.add((null, null, kMsgRequestCancelled));
}
httpClientManager.removeCancelledRequest(requestId);
} else {
controller.add((null, null, null, error.toString()));
controller.add((null, null, error.toString()));
}
await cleanup();
}
@@ -204,7 +205,9 @@ streamHttpRequest(
};
if (httpClientManager.wasRequestCancelled(requestId)) {
controller.add((null, null, null, kMsgRequestCancelled));
if (!controller.isClosed) {
controller.add((null, null, kMsgRequestCancelled));
}
httpClientManager.removeCancelledRequest(requestId);
controller.close();
return controller.stream;
@@ -226,6 +229,8 @@ streamHttpRequest(
final hasBody = kMethodsWithBody.contains(requestModel.method);
final isMultipart = requestModel.bodyContentType == ContentType.formdata;
http.StreamedResponse streamedResponse;
try {
//HANDLE MULTI-PART
if (apiType == APIType.rest && isMultipart && hasBody) {
@@ -233,7 +238,6 @@ streamHttpRequest(
requestModel.method.name.toUpperCase(),
uri,
)..headers.addAll(headers);
for (final data in requestModel.formDataList) {
if (data.type == FormDataType.text) {
multipart.fields[data.name] = data.value;
@@ -243,83 +247,54 @@ streamHttpRequest(
);
}
}
final streamedResponse = await client.send(multipart);
final stream = streamTextResponse(streamedResponse);
subscription = stream.listen(
(data) => controller.add((
streamedResponse.headers['content-type'].toString(),
data == null
? null
: HttpResponse.bytes(
utf8.encode(data),
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
),
stopwatch.elapsed,
null,
)),
onDone: () => cleanup(),
onError: handleError,
);
return controller.stream;
}
String? body;
bool overrideContentType = false;
if (hasBody && requestModel.body?.isNotEmpty == true) {
body = requestModel.body;
if (!requestModel.hasContentTypeHeader) {
headers[HttpHeaders.contentTypeHeader] =
requestModel.bodyContentType.header;
} else {
overrideContentType = true;
streamedResponse = await client.send(multipart);
} else {
String? body;
bool overrideContentType = false;
if (hasBody && requestModel.body?.isNotEmpty == true) {
body = requestModel.body;
if (!requestModel.hasContentTypeHeader) {
headers[HttpHeaders.contentTypeHeader] =
requestModel.bodyContentType.header;
} else {
overrideContentType = true;
}
}
final request = prepareHttpRequest(
url: uri,
method: requestModel.method.name.toUpperCase(),
headers: headers,
body: body,
overrideContentType: overrideContentType,
);
streamedResponse = await client.send(request);
}
final request = prepareHttpRequest(
url: uri,
method: requestModel.method.name.toUpperCase(),
headers: headers,
body: body,
overrideContentType: overrideContentType,
);
final Stream<String?> outputStream = streamTextResponse(streamedResponse);
final streamedResponse = await client.send(request);
final stream = streamTextResponse(streamedResponse);
subscription = stream.listen(
subscription = outputStream.listen(
(data) {
if (!controller.isClosed) {
controller.add((
streamedResponse.headers['content-type'].toString(),
data == null
? null
: HttpResponse.bytes(
utf8.encode(data),
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
),
stopwatch.elapsed,
null,
));
HttpResponse? resp;
if (data != null) {
resp = HttpResponse.bytes(
data.codeUnits,
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
);
}
if (!controller.isClosed) {
controller.add((resp, stopwatch.elapsed, null));
}
}
},
onDone: () => cleanup(),
onError: handleError,
);
return controller.stream;
} catch (e) {
await handleError(e);