streamTextResponse usage removed from streamHttpRequest

This commit is contained in:
Manas Hejmadi
2025-07-03 03:13:13 +05:30
parent 4207974b0e
commit 7b17cb567b
2 changed files with 61 additions and 22 deletions

View File

@@ -180,7 +180,7 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
bool noSSL = false, bool noSSL = false,
}) async { }) async {
final controller = StreamController<HttpStreamOutput>(); final controller = StreamController<HttpStreamOutput>();
StreamSubscription<String?>? subscription; StreamSubscription<List<int>?>? subscription;
final stopwatch = Stopwatch()..start(); final stopwatch = Stopwatch()..start();
cleanup() async { cleanup() async {
@@ -296,7 +296,6 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
} }
//----------------- Response Handling --------------------- //----------------- Response Handling ---------------------
final Stream<String?> outputStream = streamTextResponse(streamedResponse);
HttpResponse getResponseFromBytes(List<int> bytes) { HttpResponse getResponseFromBytes(List<int> bytes) {
return HttpResponse.bytes( return HttpResponse.bytes(
@@ -318,22 +317,22 @@ Future<Stream<HttpStreamOutput>> streamHttpRequest(
int receivedBytes = 0; int receivedBytes = 0;
bool hasEmitted = false; bool hasEmitted = false;
subscription = outputStream.listen( subscription = streamedResponse.stream.listen(
(chunk) { (bytes) {
if (chunk == null || controller.isClosed) return; if (controller.isClosed) return;
final isStreaming = kStreamingResponseTypes.contains(contentType); final isStreaming = kStreamingResponseTypes.contains(contentType);
if (isStreaming) { if (isStreaming) {
//For Streaming responses, output every response //For Streaming responses, output every response
final response = getResponseFromBytes(chunk.codeUnits); final response = getResponseFromBytes(bytes);
controller.add((true, response, stopwatch.elapsed, null)); controller.add((true, response, stopwatch.elapsed, null));
return; return;
} }
//For non Streaming events, add output to buffer //For non Streaming events, add output to buffer
receivedBytes += chunk.codeUnits.length; receivedBytes += bytes.length;
buffer.write(chunk); buffer.write(decodeBytes(bytes, contentType));
if (!hasEmitted && if (!hasEmitted &&
contentLength > 0 && contentLength > 0 &&

View File

@@ -1,9 +1,10 @@
import 'dart:convert';
import 'dart:typed_data'; import 'dart:typed_data';
import 'package:http/http.dart' as http; import 'package:http/http.dart' as http;
import 'package:http_parser/http_parser.dart'; import 'package:http_parser/http_parser.dart';
import 'package:xml/xml.dart'; import 'package:xml/xml.dart';
import '../consts.dart'; import '../consts.dart';
import 'dart:convert';
import 'dart:typed_data';
String? formatBody(String? body, MediaType? mediaType) { String? formatBody(String? body, MediaType? mediaType) {
if (mediaType != null && body != null) { if (mediaType != null && body != null) {
@@ -51,19 +52,58 @@ Future<http.Response> convertStreamedResponse(
return response; return response;
} }
Stream<String?> streamTextResponse( // Stream<String?> streamTextResponse(
http.StreamedResponse streamedResponse, // http.StreamedResponse streamedResponse,
) async* { // ) async* {
try { // try {
if (streamedResponse.statusCode != 200) { // if (streamedResponse.statusCode != 200) {
final errorText = await streamedResponse.stream.bytesToString(); // final errorText = await streamedResponse.stream.bytesToString();
throw Exception('${streamedResponse.statusCode}\n$errorText'); // throw Exception('${streamedResponse.statusCode}\n$errorText');
// }
// final utf8Stream = streamedResponse.stream.transform(utf8.decoder);
// await for (final chunk in utf8Stream) {
// yield chunk;
// }
// } catch (e) {
// rethrow;
// }
// }
String getCharset(String contentType) {
final match = RegExp(
r'charset=([^\s;]+)',
caseSensitive: false,
).firstMatch(contentType);
return match?.group(1)?.toLowerCase() ?? 'utf-8'; // default to utf-8
}
String decodeBytes(List<int> bytes, String contentType) {
String _decodeUtf16(List<int> bytes, Endian endianness) {
final byteData = ByteData.sublistView(Uint8List.fromList(bytes));
final codeUnits = <int>[];
for (int i = 0; i + 1 < byteData.lengthInBytes; i += 2) {
codeUnits.add(byteData.getUint16(i, endianness));
} }
final utf8Stream = streamedResponse.stream.transform(utf8.decoder); return String.fromCharCodes(codeUnits);
await for (final chunk in utf8Stream) { }
yield chunk;
} final cSet = getCharset(contentType);
} catch (e) { switch (cSet) {
rethrow; case 'utf-8':
case 'utf8':
return utf8.decode(bytes, allowMalformed: true);
case 'utf-16':
case 'utf-16le':
return _decodeUtf16(bytes, Endian.little);
case 'utf-16be':
return _decodeUtf16(bytes, Endian.big);
case 'iso-8859-1':
case 'latin1':
return latin1.decode(bytes);
case 'us-ascii':
case 'ascii':
return ascii.decode(bytes);
default:
return utf8.decode(bytes, allowMalformed: true); //UTF8
} }
} }