Added SSE ability to HTTPS method (fusion)

This commit is contained in:
Manas Hejmadi
2025-06-25 20:50:27 +05:30
parent e5b22a24fc
commit 97db38a42d
7 changed files with 202 additions and 16 deletions

View File

@@ -165,14 +165,18 @@ http.Request prepareHttpRequest({
return request;
}
Future<Stream<(String?, Duration?, String?)?>> streamHttpRequest(
Future<Stream<(String? cT, HttpResponse? resp, Duration? dur, String? err)?>>
streamHttpRequest(
String requestId,
APIType apiType,
HttpRequestModel requestModel, {
SupportedUriSchemes defaultUriScheme = kDefaultUriScheme,
bool noSSL = false,
}) async {
final controller = StreamController<(String?, Duration?, String?)?>();
final controller =
StreamController<
(String? cT, HttpResponse? resp, Duration? dur, String? err)?
>();
StreamSubscription<String?>? subscription;
final stopwatch = Stopwatch()..start();
@@ -186,10 +190,10 @@ Future<Stream<(String?, Duration?, String?)?>> streamHttpRequest(
Future<void> handleError(dynamic error) async {
await Future.microtask(() {});
if (httpClientManager.wasRequestCancelled(requestId)) {
controller.add((null, null, kMsgRequestCancelled));
controller.add((null, null, null, kMsgRequestCancelled));
httpClientManager.removeCancelledRequest(requestId);
} else {
controller.add((null, null, error.toString()));
controller.add((null, null, null, error.toString()));
}
await cleanup();
}
@@ -200,7 +204,7 @@ Future<Stream<(String?, Duration?, String?)?>> streamHttpRequest(
};
if (httpClientManager.wasRequestCancelled(requestId)) {
controller.add((null, null, kMsgRequestCancelled));
controller.add((null, null, null, kMsgRequestCancelled));
httpClientManager.removeCancelledRequest(requestId);
controller.close();
return controller.stream;
@@ -243,8 +247,25 @@ Future<Stream<(String?, Duration?, String?)?>> streamHttpRequest(
final streamedResponse = await client.send(multipart);
final stream = streamTextResponse(streamedResponse);
print(streamedResponse.headers['content-type']);
subscription = stream.listen(
(data) => controller.add((data, stopwatch.elapsed, null)),
(data) => controller.add((
streamedResponse.headers['content-type'].toString(),
data == null
? null
: HttpResponse.bytes(
utf8.encode(data),
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
),
stopwatch.elapsed,
null,
)),
onDone: () => cleanup(),
onError: handleError,
);
@@ -279,7 +300,22 @@ Future<Stream<(String?, Duration?, String?)?>> streamHttpRequest(
subscription = stream.listen(
(data) {
if (!controller.isClosed) {
controller.add((data, stopwatch.elapsed, null));
controller.add((
streamedResponse.headers['content-type'].toString(),
data == null
? null
: HttpResponse.bytes(
utf8.encode(data),
streamedResponse.statusCode,
request: streamedResponse.request,
headers: streamedResponse.headers,
isRedirect: streamedResponse.isRedirect,
persistentConnection: streamedResponse.persistentConnection,
reasonPhrase: streamedResponse.reasonPhrase,
),
stopwatch.elapsed,
null,
));
}
},
onDone: () => cleanup(),