From fc4eb60c6fe2fb9ad67651aeec75d1b48d11a515 Mon Sep 17 00:00:00 2001 From: Vishesh Handa Date: Wed, 4 Aug 2021 18:43:01 +0200 Subject: [PATCH] Add a WorkerQueue class This is meant to reduce the code duplication between the LinksLoader and the MdYamlDocLoader class. So far, it just highlights how fragile the message passing is between Isolates. --- lib/core/worker_queue.dart | 86 ++++++++++++++++++++++++++++++++ test/core/worker_queue_test.dart | 32 ++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 lib/core/worker_queue.dart create mode 100644 test/core/worker_queue_test.dart diff --git a/lib/core/worker_queue.dart b/lib/core/worker_queue.dart new file mode 100644 index 00000000..724a6a96 --- /dev/null +++ b/lib/core/worker_queue.dart @@ -0,0 +1,86 @@ +import 'dart:isolate'; + +import 'package:function_types/function_types.dart'; +import 'package:synchronized/synchronized.dart'; + +// This doesn't really work properly for all functions +// Waiting for this to merge - https://github.com/dart-lang/sdk/issues/36097 +class WorkerQueue { + Isolate? _isolate; + SendPort? _sendPort; + + var _receivePort = ReceivePort(); + var _loadingLock = Lock(); + + Func1 func; + + WorkerQueue(this.func); + + Future _initIsolate() async { + if (_isolate != null && _sendPort != null) return; + + return await _loadingLock.synchronized(() async { + if (_isolate != null && _sendPort != null) return; + if (_isolate != null) { + _isolate!.kill(priority: Isolate.immediate); + _isolate = null; + } + _isolate = await Isolate.spawn(_isolateMain, _receivePort.sendPort); + + var data = await _receivePort.first; + assert(data is SendPort); + _sendPort = data as SendPort; + }); + } + + Future call(INPUT input) async { + await _initIsolate(); + + var rec = ReceivePort(); + _sendPort!.send(_LoadingMessage(input, func, rec.sendPort)); + + var msg = (await rec.first) as _OutputMessage; + if (msg.output != null) { + return msg.output!; + } + if (msg.exception != null) { + throw msg.exception!; + } + throw msg.error!; + } +} + +class _LoadingMessage { + INPUT input; + SendPort sendPort; + Func1 func; + + _LoadingMessage(this.input, this.func, this.sendPort); +} + +class _OutputMessage { + OUTPUT? output; + Exception? exception; + Error? error; + + _OutputMessage({this.output, this.exception, this.error}); +} + +void _isolateMain(SendPort toMainSender) { + ReceivePort fromMainRec = ReceivePort(); + toMainSender.send(fromMainRec.sendPort); + + fromMainRec.listen((data) async { + assert(data is _LoadingMessage); + var msg = data as _LoadingMessage; + + try { + var output = msg.func(msg.input); + msg.sendPort.send(_OutputMessage(output: output)); + } on Exception catch (e) { + msg.sendPort.send(_OutputMessage(exception: e)); + } on Error catch (e) { + msg.sendPort.send(_OutputMessage(error: e)); + } + }); +} diff --git a/test/core/worker_queue_test.dart b/test/core/worker_queue_test.dart new file mode 100644 index 00000000..8f0702e0 --- /dev/null +++ b/test/core/worker_queue_test.dart @@ -0,0 +1,32 @@ +import 'package:test/test.dart'; + +import 'package:gitjournal/core/worker_queue.dart'; + +void main() { + group('WorkerQueue', () { + test('Simple', () async { + var func = (int input) => input + 5; + var worker = WorkerQueue(func); + + expect(await worker.call(2), 7); + expect(await worker.call(3), 8); + }, skip: true); + + test('Simple2', () async { + var worker = WorkerQueue(func2); + + expect(await worker.call(2), 7); + expect(await worker.call(3), 8); + }, skip: true); + + test('Simple3', () async { + var worker = WorkerQueue(func3); + + expect(await worker.call(2), 7); + expect(await worker.call(3), 8); + }); + }); +} + +int func2(int a) => a + 5; +dynamic func3(dynamic a) => (a as int) + 5;