Clean up failed job triggers (#24755)

* Fix some warnings

* Also remove job triggers in state "error" when cleaning up.

* Add CL

* Add another index for "schedule.type"
This commit is contained in:
Florian Petersen
2026-01-21 11:16:46 +01:00
committed by GitHub
parent 79472101e4
commit ab6343dff0
4 changed files with 105 additions and 50 deletions

View File

@@ -0,0 +1,5 @@
type = "fixed"
message = "Clean up documents for failed job triggers."
issues = ["23884"]
pulls = ["24755"]

View File

@@ -131,6 +131,8 @@ public class DBJobTriggerService {
collection.createIndex(Indexes.ascending(FIELD_NEXT_TIME));
collection.createIndex(Indexes.ascending(FIELD_CONSTRAINTS));
collection.createIndex(Indexes.ascending(FIELD_JOB_DEFINITION_TYPE));
collection.createIndex(Indexes.ascending(FIELD_UPDATED_AT));
collection.createIndex(Indexes.ascending(FIELD_SCHEDULE + "." + JobSchedule.TYPE_FIELD));
}
@SuppressWarnings("unused")
@@ -327,7 +329,8 @@ public class DBJobTriggerService {
eq(FIELD_LOCK_OWNER, null),
or(
eq(FIELD_STATUS, JobTriggerStatus.COMPLETE),
eq(FIELD_STATUS, JobTriggerStatus.CANCELLED)
eq(FIELD_STATUS, JobTriggerStatus.CANCELLED),
eq(FIELD_STATUS, JobTriggerStatus.ERROR)
),
eq(FIELD_SCHEDULE + "." + JobSchedule.TYPE_FIELD, OnceJobSchedule.TYPE_NAME),
lt(FIELD_UPDATED_AT, clock.nowUTC().minus(unit.toMillis(timeValue)))

View File

@@ -68,7 +68,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@ExtendWith(MongoDBExtension.class)
@MockitoSettings(strictness = Strictness.WARN)
public class DBJobTriggerServiceTest {
class DBJobTriggerServiceTest {
private static final String NODE_ID = "node-1";
private static final Duration EXPIRATION_DURATION = Duration.minutes(5);
@@ -82,7 +82,7 @@ public class DBJobTriggerServiceTest {
private MongoCollections mongoCollections;
@BeforeEach
public void setUp(MongoDBTestService dbTestService) throws Exception {
void setUp(MongoDBTestService dbTestService) {
lenient().when(schedulerCapabilitiesService.getNodeCapabilities()).thenReturn(ImmutableSet.of());
ObjectMapper objectMapper = new ObjectMapperProvider().get();
@@ -101,7 +101,7 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers.json")
public void loadPersistedTriggers() {
void loadPersistedTriggers() {
// Sort by ID to make sure we have a defined order
final List<JobTriggerDto> all;
try (Stream<JobTriggerDto> triggerStream = dbJobTriggerService.streamAll()) {
@@ -123,7 +123,7 @@ public class DBJobTriggerServiceTest {
assertThat(dto.triggeredAt()).isNotPresent();
assertThat(dto.status()).isEqualTo(JobTriggerStatus.RUNNABLE);
assertThat(dto.executionDurationMs()).isEmpty();
assertThat(dto.concurrencyRescheduleCount()).isEqualTo(0);
assertThat(dto.concurrencyRescheduleCount()).isZero();
assertThat(dto.constraints()).isEmpty();
assertThat(dto.isCancelled()).isFalse();
@@ -151,7 +151,7 @@ public class DBJobTriggerServiceTest {
assertThat(dto.triggeredAt()).isNotPresent();
assertThat(dto.status()).isEqualTo(JobTriggerStatus.RUNNABLE);
assertThat(dto.executionDurationMs()).isEmpty();
assertThat(dto.concurrencyRescheduleCount()).isEqualTo(0);
assertThat(dto.concurrencyRescheduleCount()).isZero();
assertThat(dto.constraints()).isEmpty();
assertThat(dto.isCancelled()).isFalse();
@@ -183,7 +183,7 @@ public class DBJobTriggerServiceTest {
assertThat(dto.triggeredAt()).isPresent().get().isEqualTo(DateTime.parse("2019-01-01T01:00:00.000Z"));
assertThat(dto.status()).isEqualTo(JobTriggerStatus.RUNNING);
assertThat(dto.executionDurationMs()).isEmpty();
assertThat(dto.concurrencyRescheduleCount()).isEqualTo(0);
assertThat(dto.concurrencyRescheduleCount()).isZero();
assertThat(dto.constraints()).isEmpty();
assertThat(dto.isCancelled()).isFalse();
@@ -207,7 +207,7 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers.json")
public void getForJob() {
void getForJob() {
assertThatCode(() -> dbJobTriggerService.getOneForJob(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("jobDefinitionId");
@@ -233,7 +233,7 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers.json")
public void getAllForJob() {
void getAllForJob() {
// We expect a ISE when there is more than one trigger for a single job definition
assertThatCode(() -> dbJobTriggerService.getOneForJob("54e3deadbeefdeadbeefaff3"))
@@ -251,7 +251,7 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers.json")
public void getForJobs() {
void getForJobs() {
assertThatCode(() -> dbJobTriggerService.getForJobs(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("jobDefinitionId");
@@ -263,13 +263,13 @@ public class DBJobTriggerServiceTest {
.hasSize(2)
.satisfies(triggers -> {
assertThat(triggers.get("54e3deadbeefdeadbeefaff4")).hasSize(1);
assertThat(triggers.get("54e3deadbeefdeadbeefaff4").get(0)).satisfies(trigger -> {
assertThat(triggers.get("54e3deadbeefdeadbeefaff4").getFirst()).satisfies(trigger -> {
assertThat(trigger.id()).isEqualTo("54e3deadbeefdeadbeef0002");
assertThat(trigger.jobDefinitionId()).isEqualTo("54e3deadbeefdeadbeefaff4");
});
assertThat(triggers.get("54e3deadbeefdeadbeefaff5")).hasSize(1);
assertThat(triggers.get("54e3deadbeefdeadbeefaff5").get(0)).satisfies(trigger -> {
assertThat(triggers.get("54e3deadbeefdeadbeefaff5").getFirst()).satisfies(trigger -> {
assertThat(trigger.id()).isEqualTo("54e3deadbeefdeadbeef0003");
assertThat(trigger.jobDefinitionId()).isEqualTo("54e3deadbeefdeadbeefaff5");
});
@@ -282,7 +282,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void getOrCreateTrigger() {
void getOrCreateTrigger() {
final String id = new ObjectId().toHexString();
final JobTriggerDto trigger = dbJobTriggerService.getOrCreate(JobTriggerDto.Builder.create(clock)
.id(id)
@@ -304,7 +304,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void createTrigger() {
void createTrigger() {
final JobTriggerDto trigger = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -324,7 +324,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void createTriggerWithID() {
void createTriggerWithID() {
final JobTriggerDto trigger = JobTriggerDto.Builder.create(clock)
.id("5b983c77d06b3f114bf130e2")
.jobDefinitionId("abc-123")
@@ -341,7 +341,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void updateTrigger() {
void updateTrigger() {
final JobTriggerDto originalTrigger = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -412,7 +412,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void nextRunnableTriggerWithPausedCompletedAndErrorStatus() {
void nextRunnableTriggerWithPausedCompletedAndErrorStatus() {
// No triggers yet
assertThat(dbJobTriggerService.nextRunnableTrigger()).isEmpty();
@@ -474,7 +474,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void nextRunnableTrigger() {
void nextRunnableTrigger() {
// No triggers yet
assertThat(dbJobTriggerService.nextRunnableTrigger()).isEmpty();
@@ -549,16 +549,16 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers.json")
public void nextRunnableTriggerWithEndTime() {
void nextRunnableTriggerWithEndTime() {
// Set clock to base date used in the fixture file
final JobSchedulerTestClock clock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T00:00:00.000Z"));
final DBJobTriggerService service = serviceWithClock(clock);
final JobSchedulerTestClock currentClock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T00:00:00.000Z"));
final DBJobTriggerService service = serviceWithClock(currentClock);
// No triggers yet because 54e3deadbeefdeadbeef0002 is already locked and RUNNING
assertThat(service.nextRunnableTrigger()).isEmpty();
// Advancing the clock a bit
clock.plus(2, TimeUnit.HOURS);
currentClock.plus(2, TimeUnit.HOURS);
// Now we should get trigger 54e3deadbeefdeadbeef0000
assertThat(service.nextRunnableTrigger())
@@ -570,7 +570,7 @@ public class DBJobTriggerServiceTest {
assertThat(service.nextRunnableTrigger()).isEmpty();
// Advancing clock far into the future, past the endTime of trigger 54e3deadbeefdeadbeef0001
clock.plus(40, TimeUnit.DAYS);
currentClock.plus(40, TimeUnit.DAYS);
// We shouldn't get trigger 54e3deadbeefdeadbeef0001 because of its endTime
assertThat(service.nextRunnableTrigger()).isEmpty();
@@ -594,7 +594,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void releaseTrigger() {
void releaseTrigger() {
final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -628,7 +628,7 @@ public class DBJobTriggerServiceTest {
assertThat(trigger.status()).isEqualTo(JobTriggerStatus.RUNNABLE);
assertThat(trigger.nextTime()).isEqualTo(update.nextTime().orElse(null));
assertThat(trigger.executionDurationMs()).isPresent().get().isEqualTo(15_000L);
assertThat(trigger.concurrencyRescheduleCount()).isEqualTo(0);
assertThat(trigger.concurrencyRescheduleCount()).isZero();
assertThat(trigger.data()).isPresent().get().satisfies(data -> {
assertThat(data).isInstanceOf(TestJobTriggerData.class);
assertThat(data).isEqualTo(TestJobTriggerData.create(Collections.singletonMap("hello", "world")));
@@ -640,7 +640,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void releaseTriggerWithConcurrencyRescheduleCount() {
void releaseTriggerWithConcurrencyRescheduleCount() {
final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -680,7 +680,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void releaseTriggerWithoutNextTime() {
void releaseTriggerWithoutNextTime() {
final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -718,7 +718,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void releaseTriggerWithStatus() {
void releaseTriggerWithStatus() {
final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -755,7 +755,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void releaseTriggerWithInvalidArguments() {
void releaseTriggerWithInvalidArguments() {
assertThatCode(() -> dbJobTriggerService.releaseTrigger(null, null))
.isInstanceOf(NullPointerException.class)
@@ -771,7 +771,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void releaseCancelledTrigger() {
void releaseCancelledTrigger() {
final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -815,7 +815,7 @@ public class DBJobTriggerServiceTest {
}
@Test
public void setTriggerError() {
void setTriggerError() {
final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -849,7 +849,7 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers.json")
public void delete() {
void delete() {
assertThat(dbJobTriggerService.delete("54e3deadbeefdeadbeef0000")).isTrue();
assertThat(dbJobTriggerService.delete("54e3deadbeefdeadbeef9999")).isFalse();
@@ -864,23 +864,33 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers.json")
public void deleteCompleted() {
void deleteCompleted() {
assertThat(dbJobTriggerService.deleteCompletedOnceSchedulesOlderThan(1, TimeUnit.DAYS)).isEqualTo(1);
assertThat(dbJobTriggerService.get("54e3deadbeefdeadbeef0003")).isNotPresent();
}
@Test
@MongoDBFixtures("job-triggers.json")
public void deleteCompletedTooNew() {
void deleteCompletedTooNew() {
final JobTriggerDto trigger = dbJobTriggerService.get("54e3deadbeefdeadbeef0003").orElseThrow(AssertionError::new);
// sets updated_at to recent timestamp
dbJobTriggerService.update(trigger);
assertThat(dbJobTriggerService.deleteCompletedOnceSchedulesOlderThan(1, TimeUnit.DAYS)).isZero();
}
@Test
@MongoDBFixtures("failed-job-trigger.json")
void deleteInStateErrorCompleted() {
final String docId = "54e3deadbeefdeadbeef0005";
assertThat(dbJobTriggerService.get(docId)).isPresent();
assertThat(dbJobTriggerService.deleteCompletedOnceSchedulesOlderThan(1, TimeUnit.DAYS)).isEqualTo(1);
//The trigger in state "error" is also deleted:
assertThat(dbJobTriggerService.get(docId)).isNotPresent();
}
@Test
@MongoDBFixtures("stale-job-triggers.json")
public void forceReleaseOwnedTriggers() {
void forceReleaseOwnedTriggers() {
final Set<String> triggerIds;
try (Stream<JobTriggerDto> triggerStream = dbJobTriggerService.streamAll()) {
triggerIds = triggerStream
@@ -907,7 +917,7 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("stale-job-triggers.json")
public void forceReleaseOwnedCancelledTriggers() {
void forceReleaseOwnedCancelledTriggers() {
final Set<String> cancelledTriggerIds;
try (Stream<JobTriggerDto> triggerStream = dbJobTriggerService.streamAll()) {
cancelledTriggerIds = triggerStream
@@ -934,9 +944,9 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("stale-job-triggers-with-expired-lock.json")
public void nextStaleTrigger() {
final JobSchedulerTestClock clock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T02:00:00.000Z"));
final DBJobTriggerService service = serviceWithClock(clock);
void nextStaleTrigger() {
final JobSchedulerTestClock currentClock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T02:00:00.000Z"));
final DBJobTriggerService service = serviceWithClock(currentClock);
assertThat(service.nextRunnableTrigger())
.isNotEmpty()
@@ -946,10 +956,10 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("locked-job-triggers.json")
public void updateLockedJobTriggers() {
void updateLockedJobTriggers() {
DateTime newLockTime = DateTime.parse("2019-01-01T02:00:00.000Z");
final JobSchedulerTestClock clock = new JobSchedulerTestClock(newLockTime);
final DBJobTriggerService service = serviceWithClock(clock);
final JobSchedulerTestClock currentClock = new JobSchedulerTestClock(newLockTime);
final DBJobTriggerService service = serviceWithClock(currentClock);
service.updateLockedJobTriggers();
@@ -958,13 +968,13 @@ public class DBJobTriggerServiceTest {
updatedJobTriggerIds = triggerStream
.filter(jobTriggerDto -> newLockTime.equals(jobTriggerDto.lock().lastLockTime()))
.map(JobTriggerDto::id)
.collect(Collectors.toList());
.toList();
}
assertThat(updatedJobTriggerIds).containsOnly("54e3deadbeefdeadbeef0001", "54e3deadbeefdeadbeef0002");
}
@Test
public void triggerWithConstraints() {
void triggerWithConstraints() {
final JobTriggerDto.Builder triggerBuilder = JobTriggerDto.Builder.create(clock)
.jobDefinitionId("abc-123")
.jobDefinitionType("event-processor-execution-v1")
@@ -1014,10 +1024,10 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers.json")
public void updateProgress() {
void updateProgress() {
final JobTriggerDto trigger = dbJobTriggerService.get("54e3deadbeefdeadbeef0003").orElseThrow(AssertionError::new);
assertThat(trigger.lock().progress()).isEqualTo(0);
assertThat(trigger.lock().progress()).isZero();
assertThat(dbJobTriggerService.updateProgress(trigger, 42)).isEqualTo(1);
@@ -1028,7 +1038,7 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("locked-job-triggers.json")
public void cancelTriggerByQuery() {
void cancelTriggerByQuery() {
// Must return an empty Optional if the query didn't match any trigger
assertThat(dbJobTriggerService.cancelTriggerByQuery(Filters.eq("foo", "bar"))).isEmpty();
@@ -1045,9 +1055,9 @@ public class DBJobTriggerServiceTest {
@Test
@MongoDBFixtures("job-triggers-for-overdue-count.json")
public void numberOfOverdueTriggers() {
final JobSchedulerTestClock clock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T04:00:00.000Z"));
final DBJobTriggerService service = serviceWithClock(clock);
void numberOfOverdueTriggers() {
final JobSchedulerTestClock currentClock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T04:00:00.000Z"));
final DBJobTriggerService service = serviceWithClock(currentClock);
final Map<String, Long> result = service.numberOfOverdueTriggers();

View File

@@ -0,0 +1,37 @@
{
"scheduler_triggers": [
{
"_id": {
"$oid": "54e3deadbeefdeadbeef0005"
},
"job_definition_id": "54e3deadbeefdeadbeefaff6",
"job_definition_type": "event-processor-execution-v1",
"start_time": {
"$date": "2019-01-01T00:00:00.000Z"
},
"next_time": {
"$date": "2019-01-01T00:00:00.000Z"
},
"created_at": {
"$date": "2019-01-01T00:00:00.000Z"
},
"updated_at": {
"$date": "2019-01-01T00:00:00.000Z"
},
"triggered_at": {
"$date": "2019-01-01T01:00:00.000Z"
},
"status": "error",
"lock": {
"owner": null,
"last_lock_time": null,
"clock": 0,
"progress": 0
},
"schedule": {
"type": "once"
},
"data": null
}
]
}