mirror of
https://github.com/flutter/packages.git
synced 2025-06-29 06:06:59 +08:00
[metrics_center] Add retries to unlock a lock file in case of 504 errors (#4323)
* https://github.com/flutter/flutter/issues/120440 * Updates the `GcsLock` class to retry unlocking the file in case a 504 error occurs * Updates the `GcsLock` constructor to require a `StorageApi` object instead of `AuthClient`, which allows mocking the object, since the `AuthClient` object isn't actually being used within the `GcsLock` class besides to create the `StorageApi`.
This commit is contained in:
@ -1,3 +1,7 @@
|
||||
## 1.0.10
|
||||
|
||||
* Adds retry logic when removing a `GcsLock` file lock in case of failure.
|
||||
|
||||
## 1.0.9
|
||||
|
||||
* Adds compatibility with `http` 1.0.
|
||||
|
@ -5,16 +5,13 @@
|
||||
// ignore_for_file: avoid_print
|
||||
|
||||
import 'package:googleapis/storage/v1.dart';
|
||||
import 'package:googleapis_auth/googleapis_auth.dart';
|
||||
|
||||
/// Global (in terms of earth) mutex using Google Cloud Storage.
|
||||
class GcsLock {
|
||||
/// Create a lock with an authenticated client and a GCS bucket name.
|
||||
///
|
||||
/// The client is used to communicate with Google Cloud Storage APIs.
|
||||
GcsLock(this._client, this._bucketName) {
|
||||
_api = StorageApi(_client);
|
||||
}
|
||||
GcsLock(this._api, this._bucketName);
|
||||
|
||||
/// Create a temporary lock file in GCS, and use it as a mutex mechanism to
|
||||
/// run a piece of code exclusively.
|
||||
@ -79,13 +76,28 @@ class GcsLock {
|
||||
}
|
||||
|
||||
Future<void> _unlock(String lockFileName) async {
|
||||
await _api.objects.delete(_bucketName, lockFileName);
|
||||
Duration waitPeriod = const Duration(milliseconds: 10);
|
||||
bool unlocked = false;
|
||||
// Retry in the case of GCS returning an API error, but rethrow if unable
|
||||
// to unlock after a certain period of time.
|
||||
while (!unlocked) {
|
||||
try {
|
||||
await _api.objects.delete(_bucketName, lockFileName);
|
||||
unlocked = true;
|
||||
} on DetailedApiRequestError {
|
||||
if (waitPeriod < _unlockThreshold) {
|
||||
await Future<void>.delayed(waitPeriod);
|
||||
waitPeriod *= 2;
|
||||
} else {
|
||||
rethrow;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
late StorageApi _api;
|
||||
|
||||
final String _bucketName;
|
||||
final AuthClient _client;
|
||||
final StorageApi _api;
|
||||
|
||||
static const Duration _kWarningThreshold = Duration(seconds: 10);
|
||||
static const Duration _unlockThreshold = Duration(minutes: 1);
|
||||
}
|
||||
|
@ -7,7 +7,8 @@
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:gcloud/storage.dart';
|
||||
import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError;
|
||||
import 'package:googleapis/storage/v1.dart'
|
||||
show DetailedApiRequestError, StorageApi;
|
||||
import 'package:googleapis_auth/auth_io.dart';
|
||||
|
||||
import 'common.dart';
|
||||
@ -388,7 +389,7 @@ class SkiaPerfDestination extends MetricDestination {
|
||||
}
|
||||
final SkiaPerfGcsAdaptor adaptor =
|
||||
SkiaPerfGcsAdaptor(storage.bucket(bucketName));
|
||||
final GcsLock lock = GcsLock(client, bucketName);
|
||||
final GcsLock lock = GcsLock(StorageApi(client), bucketName);
|
||||
return SkiaPerfDestination(adaptor, lock);
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
name: metrics_center
|
||||
version: 1.0.9
|
||||
version: 1.0.10
|
||||
description:
|
||||
Support multiple performance metrics sources/formats and destinations.
|
||||
repository: https://github.com/flutter/packages/tree/main/packages/metrics_center
|
||||
@ -9,6 +9,7 @@ environment:
|
||||
sdk: ">=2.18.0 <4.0.0"
|
||||
|
||||
dependencies:
|
||||
_discoveryapis_commons: ^1.0.0
|
||||
crypto: ^3.0.1
|
||||
equatable: ^2.0.3
|
||||
gcloud: ^0.8.2
|
||||
|
@ -22,7 +22,12 @@ enum TestPhase {
|
||||
run2,
|
||||
}
|
||||
|
||||
@GenerateMocks(<Type>[AuthClient])
|
||||
@GenerateMocks(<Type>[
|
||||
AuthClient,
|
||||
StorageApi
|
||||
], customMocks: <MockSpec<dynamic>>[
|
||||
MockSpec<ObjectsResource>(onMissingStub: OnMissingStub.returnDefault)
|
||||
])
|
||||
void main() {
|
||||
const Duration kDelayStep = Duration(milliseconds: 10);
|
||||
final Map<String, dynamic>? credentialsJson = getTestGcpCredentialsJson();
|
||||
@ -36,7 +41,7 @@ void main() {
|
||||
Zone.current.fork(specification: spec).run<void>(() {
|
||||
fakeAsync((FakeAsync fakeAsync) {
|
||||
final MockAuthClient mockClient = MockAuthClient();
|
||||
final GcsLock lock = GcsLock(mockClient, 'mockBucket');
|
||||
final GcsLock lock = GcsLock(StorageApi(mockClient), 'mockBucket');
|
||||
when(mockClient.send(any)).thenThrow(DetailedApiRequestError(412, ''));
|
||||
final Future<void> runFinished =
|
||||
lock.protectedRun('mock.lock', () async {});
|
||||
@ -63,7 +68,7 @@ void main() {
|
||||
test('GcsLock integration test: single protectedRun is successful', () async {
|
||||
final AutoRefreshingAuthClient client = await clientViaServiceAccount(
|
||||
ServiceAccountCredentials.fromJson(credentialsJson), Storage.SCOPES);
|
||||
final GcsLock lock = GcsLock(client, kTestBucketName);
|
||||
final GcsLock lock = GcsLock(StorageApi(client), kTestBucketName);
|
||||
int testValue = 0;
|
||||
await lock.protectedRun('test.lock', () async {
|
||||
testValue = 1;
|
||||
@ -74,8 +79,8 @@ void main() {
|
||||
test('GcsLock integration test: protectedRun is exclusive', () async {
|
||||
final AutoRefreshingAuthClient client = await clientViaServiceAccount(
|
||||
ServiceAccountCredentials.fromJson(credentialsJson), Storage.SCOPES);
|
||||
final GcsLock lock1 = GcsLock(client, kTestBucketName);
|
||||
final GcsLock lock2 = GcsLock(client, kTestBucketName);
|
||||
final GcsLock lock1 = GcsLock(StorageApi(client), kTestBucketName);
|
||||
final GcsLock lock2 = GcsLock(StorageApi(client), kTestBucketName);
|
||||
|
||||
TestPhase phase = TestPhase.run1;
|
||||
final Completer<void> started1 = Completer<void>();
|
||||
@ -105,4 +110,39 @@ void main() {
|
||||
await finished1;
|
||||
await finished2;
|
||||
}, skip: credentialsJson == null);
|
||||
|
||||
test('GcsLock attempts to unlock again on a DetailedApiRequestError',
|
||||
() async {
|
||||
fakeAsync((FakeAsync fakeAsync) {
|
||||
final StorageApi mockStorageApi = MockStorageApi();
|
||||
final ObjectsResource mockObjectsResource = MockObjectsResource();
|
||||
final GcsLock gcsLock = GcsLock(mockStorageApi, kTestBucketName);
|
||||
const String lockFileName = 'test.lock';
|
||||
when(mockStorageApi.objects).thenReturn(mockObjectsResource);
|
||||
|
||||
// Simulate a failure to delete a lock file.
|
||||
when(mockObjectsResource.delete(kTestBucketName, lockFileName))
|
||||
.thenThrow(DetailedApiRequestError(504, ''));
|
||||
|
||||
gcsLock.protectedRun(lockFileName, () async {});
|
||||
|
||||
// Allow time to pass by to ensure deleting the lock file is retried multiple times.
|
||||
fakeAsync.elapse(const Duration(milliseconds: 30));
|
||||
verify(mockObjectsResource.delete(kTestBucketName, lockFileName))
|
||||
.called(3);
|
||||
|
||||
// Simulate a successful deletion of the lock file.
|
||||
when(mockObjectsResource.delete(kTestBucketName, lockFileName))
|
||||
.thenAnswer((_) => Future<void>(
|
||||
() {
|
||||
return;
|
||||
},
|
||||
));
|
||||
|
||||
// At this point, there should only be one more (successful) attempt to delete the lock file.
|
||||
fakeAsync.elapse(const Duration(minutes: 2));
|
||||
verify(mockObjectsResource.delete(kTestBucketName, lockFileName))
|
||||
.called(1);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -8,7 +8,8 @@ import 'dart:async';
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:gcloud/storage.dart';
|
||||
import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError;
|
||||
import 'package:googleapis/storage/v1.dart'
|
||||
show DetailedApiRequestError, StorageApi;
|
||||
import 'package:googleapis_auth/auth_io.dart';
|
||||
import 'package:metrics_center/metrics_center.dart';
|
||||
import 'package:metrics_center/src/gcs_lock.dart';
|
||||
@ -426,7 +427,7 @@ Future<void> main() async {
|
||||
|
||||
assert(await storage.bucketExists(kTestBucketName));
|
||||
testBucket = storage.bucket(kTestBucketName);
|
||||
testLock = GcsLock(client, kTestBucketName);
|
||||
testLock = GcsLock(StorageApi(client), kTestBucketName);
|
||||
}
|
||||
|
||||
Future<void> skiaPerfGcsAdapterIntegrationTest() async {
|
||||
|
@ -1,7 +1,9 @@
|
||||
// Mocks generated by Mockito 5.4.0 from annotations
|
||||
// Mocks generated by Mockito 5.4.1 from annotations
|
||||
// in metrics_center/test/skiaperf_test.dart.
|
||||
// Do not manually edit this file.
|
||||
|
||||
// @dart=2.19
|
||||
|
||||
// ignore_for_file: no_leading_underscores_for_library_prefixes
|
||||
import 'dart:async' as _i2;
|
||||
|
||||
|
@ -60,6 +60,7 @@
|
||||
- wasm
|
||||
- yaml
|
||||
# Google-owned packages
|
||||
- _discoveryapis_commons
|
||||
- adaptive_navigation
|
||||
- file
|
||||
- googleapis
|
||||
|
Reference in New Issue
Block a user