From ab6343dff0bf19cdc8a4360c19bd5dfe62de06d2 Mon Sep 17 00:00:00 2001 From: Florian Petersen <188503754+fpetersen-gl@users.noreply.github.com> Date: Wed, 21 Jan 2026 11:16:46 +0100 Subject: [PATCH] Clean up failed job triggers (#24755) * Fix some warnings * Also remove job triggers in state "error" when cleaning up. * Add CL * Add another index for "schedule.type" --- changelog/unreleased/pr-24755.toml | 5 + .../scheduler/DBJobTriggerService.java | 5 +- .../scheduler/DBJobTriggerServiceTest.java | 108 ++++++++++-------- .../graylog/scheduler/failed-job-trigger.json | 37 ++++++ 4 files changed, 105 insertions(+), 50 deletions(-) create mode 100644 changelog/unreleased/pr-24755.toml create mode 100644 graylog2-server/src/test/resources/org/graylog/scheduler/failed-job-trigger.json diff --git a/changelog/unreleased/pr-24755.toml b/changelog/unreleased/pr-24755.toml new file mode 100644 index 0000000000..14acb9e7c4 --- /dev/null +++ b/changelog/unreleased/pr-24755.toml @@ -0,0 +1,5 @@ +type = "fixed" +message = "Clean up documents for failed job triggers." + +issues = ["23884"] +pulls = ["24755"] diff --git a/graylog2-server/src/main/java/org/graylog/scheduler/DBJobTriggerService.java b/graylog2-server/src/main/java/org/graylog/scheduler/DBJobTriggerService.java index 44eb259086..ed2588a0d9 100644 --- a/graylog2-server/src/main/java/org/graylog/scheduler/DBJobTriggerService.java +++ b/graylog2-server/src/main/java/org/graylog/scheduler/DBJobTriggerService.java @@ -131,6 +131,8 @@ public class DBJobTriggerService { collection.createIndex(Indexes.ascending(FIELD_NEXT_TIME)); collection.createIndex(Indexes.ascending(FIELD_CONSTRAINTS)); collection.createIndex(Indexes.ascending(FIELD_JOB_DEFINITION_TYPE)); + collection.createIndex(Indexes.ascending(FIELD_UPDATED_AT)); + collection.createIndex(Indexes.ascending(FIELD_SCHEDULE + "." + JobSchedule.TYPE_FIELD)); } @SuppressWarnings("unused") @@ -327,7 +329,8 @@ public class DBJobTriggerService { eq(FIELD_LOCK_OWNER, null), or( eq(FIELD_STATUS, JobTriggerStatus.COMPLETE), - eq(FIELD_STATUS, JobTriggerStatus.CANCELLED) + eq(FIELD_STATUS, JobTriggerStatus.CANCELLED), + eq(FIELD_STATUS, JobTriggerStatus.ERROR) ), eq(FIELD_SCHEDULE + "." + JobSchedule.TYPE_FIELD, OnceJobSchedule.TYPE_NAME), lt(FIELD_UPDATED_AT, clock.nowUTC().minus(unit.toMillis(timeValue))) diff --git a/graylog2-server/src/test/java/org/graylog/scheduler/DBJobTriggerServiceTest.java b/graylog2-server/src/test/java/org/graylog/scheduler/DBJobTriggerServiceTest.java index 6b33f413ab..eec907abe1 100644 --- a/graylog2-server/src/test/java/org/graylog/scheduler/DBJobTriggerServiceTest.java +++ b/graylog2-server/src/test/java/org/graylog/scheduler/DBJobTriggerServiceTest.java @@ -68,7 +68,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @ExtendWith(MongoDBExtension.class) @MockitoSettings(strictness = Strictness.WARN) -public class DBJobTriggerServiceTest { +class DBJobTriggerServiceTest { private static final String NODE_ID = "node-1"; private static final Duration EXPIRATION_DURATION = Duration.minutes(5); @@ -82,7 +82,7 @@ public class DBJobTriggerServiceTest { private MongoCollections mongoCollections; @BeforeEach - public void setUp(MongoDBTestService dbTestService) throws Exception { + void setUp(MongoDBTestService dbTestService) { lenient().when(schedulerCapabilitiesService.getNodeCapabilities()).thenReturn(ImmutableSet.of()); ObjectMapper objectMapper = new ObjectMapperProvider().get(); @@ -101,7 +101,7 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers.json") - public void loadPersistedTriggers() { + void loadPersistedTriggers() { // Sort by ID to make sure we have a defined order final List all; try (Stream triggerStream = dbJobTriggerService.streamAll()) { @@ -123,7 +123,7 @@ public class DBJobTriggerServiceTest { assertThat(dto.triggeredAt()).isNotPresent(); assertThat(dto.status()).isEqualTo(JobTriggerStatus.RUNNABLE); assertThat(dto.executionDurationMs()).isEmpty(); - assertThat(dto.concurrencyRescheduleCount()).isEqualTo(0); + assertThat(dto.concurrencyRescheduleCount()).isZero(); assertThat(dto.constraints()).isEmpty(); assertThat(dto.isCancelled()).isFalse(); @@ -151,7 +151,7 @@ public class DBJobTriggerServiceTest { assertThat(dto.triggeredAt()).isNotPresent(); assertThat(dto.status()).isEqualTo(JobTriggerStatus.RUNNABLE); assertThat(dto.executionDurationMs()).isEmpty(); - assertThat(dto.concurrencyRescheduleCount()).isEqualTo(0); + assertThat(dto.concurrencyRescheduleCount()).isZero(); assertThat(dto.constraints()).isEmpty(); assertThat(dto.isCancelled()).isFalse(); @@ -183,7 +183,7 @@ public class DBJobTriggerServiceTest { assertThat(dto.triggeredAt()).isPresent().get().isEqualTo(DateTime.parse("2019-01-01T01:00:00.000Z")); assertThat(dto.status()).isEqualTo(JobTriggerStatus.RUNNING); assertThat(dto.executionDurationMs()).isEmpty(); - assertThat(dto.concurrencyRescheduleCount()).isEqualTo(0); + assertThat(dto.concurrencyRescheduleCount()).isZero(); assertThat(dto.constraints()).isEmpty(); assertThat(dto.isCancelled()).isFalse(); @@ -207,7 +207,7 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers.json") - public void getForJob() { + void getForJob() { assertThatCode(() -> dbJobTriggerService.getOneForJob(null)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("jobDefinitionId"); @@ -233,7 +233,7 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers.json") - public void getAllForJob() { + void getAllForJob() { // We expect a ISE when there is more than one trigger for a single job definition assertThatCode(() -> dbJobTriggerService.getOneForJob("54e3deadbeefdeadbeefaff3")) @@ -251,7 +251,7 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers.json") - public void getForJobs() { + void getForJobs() { assertThatCode(() -> dbJobTriggerService.getForJobs(null)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("jobDefinitionId"); @@ -263,13 +263,13 @@ public class DBJobTriggerServiceTest { .hasSize(2) .satisfies(triggers -> { assertThat(triggers.get("54e3deadbeefdeadbeefaff4")).hasSize(1); - assertThat(triggers.get("54e3deadbeefdeadbeefaff4").get(0)).satisfies(trigger -> { + assertThat(triggers.get("54e3deadbeefdeadbeefaff4").getFirst()).satisfies(trigger -> { assertThat(trigger.id()).isEqualTo("54e3deadbeefdeadbeef0002"); assertThat(trigger.jobDefinitionId()).isEqualTo("54e3deadbeefdeadbeefaff4"); }); assertThat(triggers.get("54e3deadbeefdeadbeefaff5")).hasSize(1); - assertThat(triggers.get("54e3deadbeefdeadbeefaff5").get(0)).satisfies(trigger -> { + assertThat(triggers.get("54e3deadbeefdeadbeefaff5").getFirst()).satisfies(trigger -> { assertThat(trigger.id()).isEqualTo("54e3deadbeefdeadbeef0003"); assertThat(trigger.jobDefinitionId()).isEqualTo("54e3deadbeefdeadbeefaff5"); }); @@ -282,7 +282,7 @@ public class DBJobTriggerServiceTest { } @Test - public void getOrCreateTrigger() { + void getOrCreateTrigger() { final String id = new ObjectId().toHexString(); final JobTriggerDto trigger = dbJobTriggerService.getOrCreate(JobTriggerDto.Builder.create(clock) .id(id) @@ -304,7 +304,7 @@ public class DBJobTriggerServiceTest { } @Test - public void createTrigger() { + void createTrigger() { final JobTriggerDto trigger = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -324,7 +324,7 @@ public class DBJobTriggerServiceTest { } @Test - public void createTriggerWithID() { + void createTriggerWithID() { final JobTriggerDto trigger = JobTriggerDto.Builder.create(clock) .id("5b983c77d06b3f114bf130e2") .jobDefinitionId("abc-123") @@ -341,7 +341,7 @@ public class DBJobTriggerServiceTest { } @Test - public void updateTrigger() { + void updateTrigger() { final JobTriggerDto originalTrigger = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -412,7 +412,7 @@ public class DBJobTriggerServiceTest { } @Test - public void nextRunnableTriggerWithPausedCompletedAndErrorStatus() { + void nextRunnableTriggerWithPausedCompletedAndErrorStatus() { // No triggers yet assertThat(dbJobTriggerService.nextRunnableTrigger()).isEmpty(); @@ -474,7 +474,7 @@ public class DBJobTriggerServiceTest { } @Test - public void nextRunnableTrigger() { + void nextRunnableTrigger() { // No triggers yet assertThat(dbJobTriggerService.nextRunnableTrigger()).isEmpty(); @@ -549,16 +549,16 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers.json") - public void nextRunnableTriggerWithEndTime() { + void nextRunnableTriggerWithEndTime() { // Set clock to base date used in the fixture file - final JobSchedulerTestClock clock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T00:00:00.000Z")); - final DBJobTriggerService service = serviceWithClock(clock); + final JobSchedulerTestClock currentClock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T00:00:00.000Z")); + final DBJobTriggerService service = serviceWithClock(currentClock); // No triggers yet because 54e3deadbeefdeadbeef0002 is already locked and RUNNING assertThat(service.nextRunnableTrigger()).isEmpty(); // Advancing the clock a bit - clock.plus(2, TimeUnit.HOURS); + currentClock.plus(2, TimeUnit.HOURS); // Now we should get trigger 54e3deadbeefdeadbeef0000 assertThat(service.nextRunnableTrigger()) @@ -570,7 +570,7 @@ public class DBJobTriggerServiceTest { assertThat(service.nextRunnableTrigger()).isEmpty(); // Advancing clock far into the future, past the endTime of trigger 54e3deadbeefdeadbeef0001 - clock.plus(40, TimeUnit.DAYS); + currentClock.plus(40, TimeUnit.DAYS); // We shouldn't get trigger 54e3deadbeefdeadbeef0001 because of its endTime assertThat(service.nextRunnableTrigger()).isEmpty(); @@ -594,7 +594,7 @@ public class DBJobTriggerServiceTest { } @Test - public void releaseTrigger() { + void releaseTrigger() { final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -628,7 +628,7 @@ public class DBJobTriggerServiceTest { assertThat(trigger.status()).isEqualTo(JobTriggerStatus.RUNNABLE); assertThat(trigger.nextTime()).isEqualTo(update.nextTime().orElse(null)); assertThat(trigger.executionDurationMs()).isPresent().get().isEqualTo(15_000L); - assertThat(trigger.concurrencyRescheduleCount()).isEqualTo(0); + assertThat(trigger.concurrencyRescheduleCount()).isZero(); assertThat(trigger.data()).isPresent().get().satisfies(data -> { assertThat(data).isInstanceOf(TestJobTriggerData.class); assertThat(data).isEqualTo(TestJobTriggerData.create(Collections.singletonMap("hello", "world"))); @@ -640,7 +640,7 @@ public class DBJobTriggerServiceTest { } @Test - public void releaseTriggerWithConcurrencyRescheduleCount() { + void releaseTriggerWithConcurrencyRescheduleCount() { final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -680,7 +680,7 @@ public class DBJobTriggerServiceTest { } @Test - public void releaseTriggerWithoutNextTime() { + void releaseTriggerWithoutNextTime() { final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -718,7 +718,7 @@ public class DBJobTriggerServiceTest { } @Test - public void releaseTriggerWithStatus() { + void releaseTriggerWithStatus() { final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -755,7 +755,7 @@ public class DBJobTriggerServiceTest { } @Test - public void releaseTriggerWithInvalidArguments() { + void releaseTriggerWithInvalidArguments() { assertThatCode(() -> dbJobTriggerService.releaseTrigger(null, null)) .isInstanceOf(NullPointerException.class) @@ -771,7 +771,7 @@ public class DBJobTriggerServiceTest { } @Test - public void releaseCancelledTrigger() { + void releaseCancelledTrigger() { final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -815,7 +815,7 @@ public class DBJobTriggerServiceTest { } @Test - public void setTriggerError() { + void setTriggerError() { final JobTriggerDto trigger1 = dbJobTriggerService.create(JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -849,7 +849,7 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers.json") - public void delete() { + void delete() { assertThat(dbJobTriggerService.delete("54e3deadbeefdeadbeef0000")).isTrue(); assertThat(dbJobTriggerService.delete("54e3deadbeefdeadbeef9999")).isFalse(); @@ -864,23 +864,33 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers.json") - public void deleteCompleted() { + void deleteCompleted() { assertThat(dbJobTriggerService.deleteCompletedOnceSchedulesOlderThan(1, TimeUnit.DAYS)).isEqualTo(1); assertThat(dbJobTriggerService.get("54e3deadbeefdeadbeef0003")).isNotPresent(); } @Test @MongoDBFixtures("job-triggers.json") - public void deleteCompletedTooNew() { + void deleteCompletedTooNew() { final JobTriggerDto trigger = dbJobTriggerService.get("54e3deadbeefdeadbeef0003").orElseThrow(AssertionError::new); // sets updated_at to recent timestamp dbJobTriggerService.update(trigger); assertThat(dbJobTriggerService.deleteCompletedOnceSchedulesOlderThan(1, TimeUnit.DAYS)).isZero(); } + @Test + @MongoDBFixtures("failed-job-trigger.json") + void deleteInStateErrorCompleted() { + final String docId = "54e3deadbeefdeadbeef0005"; + assertThat(dbJobTriggerService.get(docId)).isPresent(); + assertThat(dbJobTriggerService.deleteCompletedOnceSchedulesOlderThan(1, TimeUnit.DAYS)).isEqualTo(1); + //The trigger in state "error" is also deleted: + assertThat(dbJobTriggerService.get(docId)).isNotPresent(); + } + @Test @MongoDBFixtures("stale-job-triggers.json") - public void forceReleaseOwnedTriggers() { + void forceReleaseOwnedTriggers() { final Set triggerIds; try (Stream triggerStream = dbJobTriggerService.streamAll()) { triggerIds = triggerStream @@ -907,7 +917,7 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("stale-job-triggers.json") - public void forceReleaseOwnedCancelledTriggers() { + void forceReleaseOwnedCancelledTriggers() { final Set cancelledTriggerIds; try (Stream triggerStream = dbJobTriggerService.streamAll()) { cancelledTriggerIds = triggerStream @@ -934,9 +944,9 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("stale-job-triggers-with-expired-lock.json") - public void nextStaleTrigger() { - final JobSchedulerTestClock clock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T02:00:00.000Z")); - final DBJobTriggerService service = serviceWithClock(clock); + void nextStaleTrigger() { + final JobSchedulerTestClock currentClock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T02:00:00.000Z")); + final DBJobTriggerService service = serviceWithClock(currentClock); assertThat(service.nextRunnableTrigger()) .isNotEmpty() @@ -946,10 +956,10 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("locked-job-triggers.json") - public void updateLockedJobTriggers() { + void updateLockedJobTriggers() { DateTime newLockTime = DateTime.parse("2019-01-01T02:00:00.000Z"); - final JobSchedulerTestClock clock = new JobSchedulerTestClock(newLockTime); - final DBJobTriggerService service = serviceWithClock(clock); + final JobSchedulerTestClock currentClock = new JobSchedulerTestClock(newLockTime); + final DBJobTriggerService service = serviceWithClock(currentClock); service.updateLockedJobTriggers(); @@ -958,13 +968,13 @@ public class DBJobTriggerServiceTest { updatedJobTriggerIds = triggerStream .filter(jobTriggerDto -> newLockTime.equals(jobTriggerDto.lock().lastLockTime())) .map(JobTriggerDto::id) - .collect(Collectors.toList()); + .toList(); } assertThat(updatedJobTriggerIds).containsOnly("54e3deadbeefdeadbeef0001", "54e3deadbeefdeadbeef0002"); } @Test - public void triggerWithConstraints() { + void triggerWithConstraints() { final JobTriggerDto.Builder triggerBuilder = JobTriggerDto.Builder.create(clock) .jobDefinitionId("abc-123") .jobDefinitionType("event-processor-execution-v1") @@ -1014,10 +1024,10 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers.json") - public void updateProgress() { + void updateProgress() { final JobTriggerDto trigger = dbJobTriggerService.get("54e3deadbeefdeadbeef0003").orElseThrow(AssertionError::new); - assertThat(trigger.lock().progress()).isEqualTo(0); + assertThat(trigger.lock().progress()).isZero(); assertThat(dbJobTriggerService.updateProgress(trigger, 42)).isEqualTo(1); @@ -1028,7 +1038,7 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("locked-job-triggers.json") - public void cancelTriggerByQuery() { + void cancelTriggerByQuery() { // Must return an empty Optional if the query didn't match any trigger assertThat(dbJobTriggerService.cancelTriggerByQuery(Filters.eq("foo", "bar"))).isEmpty(); @@ -1045,9 +1055,9 @@ public class DBJobTriggerServiceTest { @Test @MongoDBFixtures("job-triggers-for-overdue-count.json") - public void numberOfOverdueTriggers() { - final JobSchedulerTestClock clock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T04:00:00.000Z")); - final DBJobTriggerService service = serviceWithClock(clock); + void numberOfOverdueTriggers() { + final JobSchedulerTestClock currentClock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T04:00:00.000Z")); + final DBJobTriggerService service = serviceWithClock(currentClock); final Map result = service.numberOfOverdueTriggers(); diff --git a/graylog2-server/src/test/resources/org/graylog/scheduler/failed-job-trigger.json b/graylog2-server/src/test/resources/org/graylog/scheduler/failed-job-trigger.json new file mode 100644 index 0000000000..981dcdd96f --- /dev/null +++ b/graylog2-server/src/test/resources/org/graylog/scheduler/failed-job-trigger.json @@ -0,0 +1,37 @@ +{ + "scheduler_triggers": [ + { + "_id": { + "$oid": "54e3deadbeefdeadbeef0005" + }, + "job_definition_id": "54e3deadbeefdeadbeefaff6", + "job_definition_type": "event-processor-execution-v1", + "start_time": { + "$date": "2019-01-01T00:00:00.000Z" + }, + "next_time": { + "$date": "2019-01-01T00:00:00.000Z" + }, + "created_at": { + "$date": "2019-01-01T00:00:00.000Z" + }, + "updated_at": { + "$date": "2019-01-01T00:00:00.000Z" + }, + "triggered_at": { + "$date": "2019-01-01T01:00:00.000Z" + }, + "status": "error", + "lock": { + "owner": null, + "last_lock_time": null, + "clock": 0, + "progress": 0 + }, + "schedule": { + "type": "once" + }, + "data": null + } + ] +}