Add a WorkerQueue class

This is meant to reduce the code duplication between the LinksLoader and
the MdYamlDocLoader class. So far, it just highlights how fragile
the message passing is between Isolates.
This commit is contained in:
Vishesh Handa
2021-08-04 18:43:01 +02:00
parent 879e238428
commit fc4eb60c6f
2 changed files with 118 additions and 0 deletions

View File

@ -0,0 +1,86 @@
import 'dart:isolate';
import 'package:function_types/function_types.dart';
import 'package:synchronized/synchronized.dart';
// This doesn't really work properly for all functions
// Waiting for this to merge - https://github.com/dart-lang/sdk/issues/36097
class WorkerQueue<INPUT, OUTPUT> {
Isolate? _isolate;
SendPort? _sendPort;
var _receivePort = ReceivePort();
var _loadingLock = Lock();
Func1<INPUT, OUTPUT> func;
WorkerQueue(this.func);
Future<void> _initIsolate() async {
if (_isolate != null && _sendPort != null) return;
return await _loadingLock.synchronized(() async {
if (_isolate != null && _sendPort != null) return;
if (_isolate != null) {
_isolate!.kill(priority: Isolate.immediate);
_isolate = null;
}
_isolate = await Isolate.spawn(_isolateMain, _receivePort.sendPort);
var data = await _receivePort.first;
assert(data is SendPort);
_sendPort = data as SendPort;
});
}
Future<OUTPUT> call(INPUT input) async {
await _initIsolate();
var rec = ReceivePort();
_sendPort!.send(_LoadingMessage(input, func, rec.sendPort));
var msg = (await rec.first) as _OutputMessage<OUTPUT>;
if (msg.output != null) {
return msg.output!;
}
if (msg.exception != null) {
throw msg.exception!;
}
throw msg.error!;
}
}
class _LoadingMessage<INPUT, OUTPUT> {
INPUT input;
SendPort sendPort;
Func1<INPUT, OUTPUT> func;
_LoadingMessage(this.input, this.func, this.sendPort);
}
class _OutputMessage<OUTPUT> {
OUTPUT? output;
Exception? exception;
Error? error;
_OutputMessage({this.output, this.exception, this.error});
}
void _isolateMain(SendPort toMainSender) {
ReceivePort fromMainRec = ReceivePort();
toMainSender.send(fromMainRec.sendPort);
fromMainRec.listen((data) async {
assert(data is _LoadingMessage);
var msg = data as _LoadingMessage;
try {
var output = msg.func(msg.input);
msg.sendPort.send(_OutputMessage(output: output));
} on Exception catch (e) {
msg.sendPort.send(_OutputMessage(exception: e));
} on Error catch (e) {
msg.sendPort.send(_OutputMessage(error: e));
}
});
}

View File

@ -0,0 +1,32 @@
import 'package:test/test.dart';
import 'package:gitjournal/core/worker_queue.dart';
void main() {
group('WorkerQueue', () {
test('Simple', () async {
var func = (int input) => input + 5;
var worker = WorkerQueue(func);
expect(await worker.call(2), 7);
expect(await worker.call(3), 8);
}, skip: true);
test('Simple2', () async {
var worker = WorkerQueue(func2);
expect(await worker.call(2), 7);
expect(await worker.call(3), 8);
}, skip: true);
test('Simple3', () async {
var worker = WorkerQueue(func3);
expect(await worker.call(2), 7);
expect(await worker.call(3), 8);
});
});
}
int func2(int a) => a + 5;
dynamic func3(dynamic a) => (a as int) + 5;