AnalyticsStorage: Add an API to fetch the events as a transaction

This commit is contained in:
Vishesh Handa
2021-08-11 10:13:20 +02:00
parent 5a9531778b
commit 02f7ab4741
2 changed files with 145 additions and 23 deletions
lib/analytics
test/analytics

@ -1,21 +1,42 @@
import 'dart:math' as math;
import 'dart:typed_data';
import 'package:buffer/buffer.dart';
import 'package:function_types/function_types.dart';
import 'package:meta/meta.dart';
import 'package:path/path.dart' as p;
import 'package:synchronized/synchronized.dart';
import 'package:universal_io/io.dart';
import 'generated/analytics.pb.dart' as pb;
class AnalyticsStorage {
final String folderPath;
late String currentFile;
AnalyticsStorage(this.folderPath);
final _lock = Lock();
var numEventsThisSession = 0;
AnalyticsStorage(this.folderPath) {
_resetFile();
}
void _resetFile() {
var nowUtc = DateTime.now().toUtc();
var name = nowUtc.millisecondsSinceEpoch.toString();
currentFile = p.join(folderPath, name);
}
Future<void> appendEvent(pb.Event event) async {
var eventData = event.writeToBuffer();
await _lock.synchronized(() {
return appendEventToFile(event, currentFile);
});
}
var filePath = p.join(folderPath, 'analytics');
// print(filePath);
@visibleForTesting
Future<void> appendEventToFile(pb.Event event, String filePath) async {
var eventData = event.writeToBuffer();
var intData = ByteData(4);
intData.setInt32(0, eventData.length);
@ -25,11 +46,12 @@ class AnalyticsStorage {
builder.add(eventData);
await File(filePath).writeAsBytes(builder.toBytes(), mode: FileMode.append);
numEventsThisSession++;
}
Future<List<pb.Event>> fetchAll() async {
var file = File(p.join(folderPath, 'analytics'));
var bytes = await file.readAsBytes();
@visibleForTesting
Future<List<pb.Event>> fetchFromFile(String filePath) async {
var bytes = await File(filePath).readAsBytes();
var events = <pb.Event>[];
var reader = ByteDataReader(copy: false);
@ -43,4 +65,54 @@ class AnalyticsStorage {
}
return events;
}
Future<List<String>> _availableFiles() async {
var paths = <String>[];
var dir = Directory(folderPath);
await for (var entity in dir.list()) {
if (entity is! File) {
assert(false, "Analytics directory contains non Files");
continue;
}
if (entity.path == currentFile) {
continue;
}
paths.add(entity.path);
}
return paths;
}
// If the callback returns 'true' then the events are deleted
// otherwise a subsequent call to fetchAll will return them!
Future<void> fetchAll(Func1<List<pb.Event>, Future<bool>> callback) async {
await _lock.synchronized(_resetFile);
var allEvents = <pb.Event>[];
var filePaths = await _availableFiles();
for (var filePath in filePaths) {
var events = await fetchFromFile(filePath);
allEvents.addAll(events);
}
var shouldDelete = await callback(allEvents);
if (shouldDelete) {
for (var filePath in filePaths) {
File(filePath).deleteSync();
}
}
}
Future<DateTime> oldestEvent() async {
var fileNames = (await _availableFiles()).map(p.basename);
var timestamps = fileNames.map(int.parse);
var smallest = timestamps.reduce(math.min);
return DateTime.fromMillisecondsSinceEpoch(smallest, isUtc: true);
}
}
// FIXME: Error handling?

@ -1,4 +1,7 @@
import 'dart:math';
import 'package:fixnum/fixnum.dart';
import 'package:path/path.dart' as p;
import 'package:test/test.dart';
import 'package:universal_io/io.dart';
@ -7,25 +10,72 @@ import 'package:gitjournal/analytics/storage.dart';
void main() {
test('Read and write', () async {
var dt = DateTime.now().add(const Duration(days: -1));
var ev = pb.Event(
name: 'test',
date: Int64(dt.millisecondsSinceEpoch ~/ 1000),
params: {'a': 'hello'},
pseudoId: 'id',
userProperties: {'b': 'c'},
sessionID: 'session',
);
var ev1 = _randomEvent();
var ev2 = _randomEvent();
var dir = await Directory.systemTemp.createTemp('_analytics_');
var af = p.join(dir.path, "analytics");
var storage = AnalyticsStorage(dir.path);
await storage.appendEventToFile(ev1, af);
await storage.appendEventToFile(ev2, af);
var events = await storage.fetchFromFile(af);
expect(events.length, 2);
expect(events[0].toDebugString(), ev1.toDebugString());
expect(events[1].toDebugString(), ev2.toDebugString());
// expect(events[0], ev);
});
test('Fetch All', () async {
var ev1 = _randomEvent();
var ev2 = _randomEvent();
var ev3 = _randomEvent();
var dir = await Directory.systemTemp.createTemp('_analytics_');
var storage = AnalyticsStorage(dir.path);
await storage.appendEvent(ev);
await storage.appendEvent(ev);
var events = await storage.fetchAll();
expect(events.length, 2);
expect(events[0].toDebugString(), ev.toDebugString());
expect(events[1].toDebugString(), ev.toDebugString());
// expect(events[0], ev);
await storage.appendEvent(ev1);
await storage.appendEvent(ev2);
await storage.fetchAll((events) async {
expect(events.length, 2);
expect(events[0].toDebugString(), ev1.toDebugString());
expect(events[1].toDebugString(), ev2.toDebugString());
return false;
});
await storage.appendEvent(ev3);
await storage.fetchAll((events) async {
expect(events.length, 3);
expect(events[0].toDebugString(), ev1.toDebugString());
expect(events[1].toDebugString(), ev2.toDebugString());
expect(events[2].toDebugString(), ev3.toDebugString());
return true;
});
await storage.fetchAll((events) async {
expect(events.length, 0);
return true;
});
await storage.fetchAll((events) async {
expect(events.length, 0);
return false;
});
});
}
pb.Event _randomEvent() {
var random = Random();
var dt = DateTime.now().add(Duration(days: random.nextInt(5000) * -1));
var ev = pb.Event(
name: 'test-' + random.nextInt(100).toString(),
date: Int64(dt.millisecondsSinceEpoch ~/ 1000),
params: {'a': 'hello'},
pseudoId: 'id',
userProperties: {'b': 'c'},
sessionID: 'session',
);
return ev;
}