http_service: cleanup & streamHttpRequest: simplified

This commit is contained in:
Manas Hejmadi
2025-07-04 15:00:06 +05:30
parent 008eca6066
commit 7a8d1a6f4a
4 changed files with 21 additions and 143 deletions

View File

@@ -3,6 +3,7 @@ include: package:flutter_lints/flutter.yaml
analyzer:
errors:
invalid_annotation_target: ignore
no_leading_underscores_for_local_identifiers: ignore
exclude:
- "**/*.freezed.dart"
- "**/*.g.dart"

View File

@@ -25,6 +25,8 @@ enum HTTPVerb {
List<String> kStreamingResponseTypes = [
'text/event-stream',
'application/x-ndjson',
'application/json-seq',
'application/octet-stream',
];
enum SupportedUriSchemes { https, http }

View File

@@ -1,3 +1,5 @@
// ignore_for_file: no_leading_underscores_for_local_identifiers
import 'dart:async';
import 'dart:convert';
import 'dart:io';
@@ -29,122 +31,6 @@ Future<(HttpResponse?, Duration?, String?)> sendHttpRequest(
);
final output = await stream.first;
return (output?.$2, output?.$3, output?.$4);
// if (httpClientManager.wasRequestCancelled(requestId)) {
// httpClientManager.removeCancelledRequest(requestId);
// }
// final client = httpClientManager.createClient(requestId, noSSL: noSSL);
// (Uri?, String?) uriRec = getValidRequestUri(
// requestModel.url,
// requestModel.enabledParams,
// defaultUriScheme: defaultUriScheme,
// );
// if (uriRec.$1 != null) {
// Uri requestUrl = uriRec.$1!;
// Map<String, String> headers = requestModel.enabledHeadersMap;
// bool overrideContentType = false;
// HttpResponse? response;
// String? body;
// try {
// Stopwatch stopwatch = Stopwatch()..start();
// if (apiType == APIType.rest) {
// var isMultiPartRequest =
// requestModel.bodyContentType == ContentType.formdata;
// if (kMethodsWithBody.contains(requestModel.method)) {
// var requestBody = requestModel.body;
// if (requestBody != null &&
// !isMultiPartRequest &&
// requestBody.isNotEmpty) {
// body = requestBody;
// if (requestModel.hasContentTypeHeader) {
// overrideContentType = true;
// } else {
// headers[HttpHeaders.contentTypeHeader] =
// requestModel.bodyContentType.header;
// }
// }
// if (isMultiPartRequest) {
// var multiPartRequest = http.MultipartRequest(
// requestModel.method.name.toUpperCase(),
// requestUrl,
// );
// multiPartRequest.headers.addAll(headers);
// for (var formData in requestModel.formDataList) {
// if (formData.type == FormDataType.text) {
// multiPartRequest.fields.addAll({formData.name: formData.value});
// } else {
// multiPartRequest.files.add(
// await http.MultipartFile.fromPath(
// formData.name,
// formData.value,
// ),
// );
// }
// }
// http.StreamedResponse multiPartResponse = await client.send(
// multiPartRequest,
// );
// stopwatch.stop();
// http.Response convertedMultiPartResponse =
// await convertStreamedResponse(multiPartResponse);
// return (convertedMultiPartResponse, stopwatch.elapsed, null);
// }
// }
// switch (requestModel.method) {
// case HTTPVerb.get:
// response = await client.get(requestUrl, headers: headers);
// break;
// case HTTPVerb.head:
// response = await client.head(requestUrl, headers: headers);
// break;
// case HTTPVerb.post:
// case HTTPVerb.put:
// case HTTPVerb.patch:
// case HTTPVerb.delete:
// case HTTPVerb.options:
// final request = prepareHttpRequest(
// url: requestUrl,
// method: requestModel.method.name.toUpperCase(),
// headers: headers,
// body: body,
// overrideContentType: overrideContentType,
// );
// final streamed = await client.send(request);
// response = await http.Response.fromStream(streamed);
// break;
// }
// }
// if (apiType == APIType.graphql) {
// var requestBody = getGraphQLBody(requestModel);
// if (requestBody != null) {
// var contentLength = utf8.encode(requestBody).length;
// if (contentLength > 0) {
// body = requestBody;
// headers[HttpHeaders.contentLengthHeader] = contentLength.toString();
// if (!requestModel.hasContentTypeHeader) {
// headers[HttpHeaders.contentTypeHeader] = ContentType.json.header;
// }
// }
// }
// response = await client.post(requestUrl, headers: headers, body: body);
// }
// stopwatch.stop();
// return (response, stopwatch.elapsed, null);
// } catch (e) {
// if (httpClientManager.wasRequestCancelled(requestId)) {
// return (null, null, kMsgRequestCancelled);
// }
// return (null, null, e.toString());
// } finally {
// httpClientManager.closeClient(requestId);
// }
// } else {
// return (null, null, uriRec.$2);
// }
}
void cancelHttpRequest(String? requestId) {
@@ -193,7 +79,7 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
StreamSubscription<List<int>?>? subscription;
final stopwatch = Stopwatch()..start();
Future<void> cleanup() async {
Future<void> _cleanup() async {
stopwatch.stop();
await subscription?.cancel();
httpClientManager.closeClient(requestId);
@@ -201,21 +87,21 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
controller.close();
}
Future<void> addCancelledMessage() async {
Future<void> _addCancelledMessage() async {
if (!controller.isClosed) {
controller.add((null, null, null, kMsgRequestCancelled));
}
httpClientManager.removeCancelledRequest(requestId);
await cleanup();
await _cleanup();
}
Future<void> addErrorMessage(dynamic error) async {
Future<void> _addErrorMessage(dynamic error) async {
await Future.microtask(() {});
if (httpClientManager.wasRequestCancelled(requestId)) {
await addCancelledMessage();
await _addCancelledMessage();
} else {
controller.add((null, null, null, error.toString()));
await cleanup();
await _cleanup();
}
}
@@ -225,7 +111,7 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
};
if (httpClientManager.wasRequestCancelled(requestId)) {
await addCancelledMessage();
await _addCancelledMessage();
return controller.stream;
}
@@ -237,7 +123,7 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
);
if (uri == null) {
await addErrorMessage(uriError ?? 'Invalid URL');
await _addErrorMessage(uriError ?? 'Invalid URL');
return controller.stream;
}
@@ -249,7 +135,7 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
apiType: apiType,
);
HttpResponse getResponseFromBytes(List<int> bytes) {
HttpResponse _createResponseFromBytes(List<int> bytes) {
return HttpResponse.bytes(
bytes,
streamedResponse.statusCode,
@@ -264,43 +150,33 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
final contentType =
streamedResponse.headers['content-type']?.toString() ?? '';
final chunkList = <List<int>>[];
bool hasEmitted = false;
subscription = streamedResponse.stream.listen(
(bytes) async {
if (controller.isClosed) return;
if (httpClientManager.wasRequestCancelled(requestId)) {
return await addCancelledMessage();
}
final isStreaming = kStreamingResponseTypes.contains(contentType);
if (isStreaming) {
final response = getResponseFromBytes(bytes);
final response = _createResponseFromBytes(bytes);
controller.add((true, response, stopwatch.elapsed, null));
hasEmitted = true;
} else {
chunkList.add(bytes);
}
},
onDone: () async {
if (httpClientManager.wasRequestCancelled(requestId)) {
return await addCancelledMessage();
}
if (!hasEmitted && !controller.isClosed) {
if (chunkList.isNotEmpty && !controller.isClosed) {
final allBytes = chunkList.expand((x) => x).toList();
final response = getResponseFromBytes(allBytes);
final response = _createResponseFromBytes(allBytes);
final isStreaming = kStreamingResponseTypes.contains(contentType);
controller.add((isStreaming, response, stopwatch.elapsed, null));
chunkList.clear();
}
await cleanup();
await _cleanup();
},
onError: addErrorMessage,
onError: _addErrorMessage,
);
return controller.stream;
} catch (e) {
await addErrorMessage(e);
await _addErrorMessage(e);
return controller.stream;
}
}

View File

@@ -4,7 +4,6 @@ import 'package:http_parser/http_parser.dart';
import 'package:xml/xml.dart';
import '../consts.dart';
import 'dart:convert';
import 'dart:typed_data';
String? formatBody(String? body, MediaType? mediaType) {
if (mediaType != null && body != null) {