From b39f116b0e5e614d2f65fbf5db6ba7669d5073c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 11 Jan 2024 19:55:43 +0100 Subject: [PATCH] Manager: after deleting a job, perform a database consistency check Deleting jobs from the database can still sometimes cause consistency errors, as if foreign key constraints aren't enabled. This check is there to try and get a grip on things. --- internal/manager/job_deleter/interfaces.go | 2 ++ internal/manager/job_deleter/job_deleter.go | 5 +++++ internal/manager/job_deleter/job_deleter_test.go | 2 ++ .../job_deleter/mocks/interfaces_mock.gen.go | 12 ++++++++++++ internal/manager/persistence/db.go | 8 ++++++++ internal/manager/persistence/integrity.go | 13 +++++++++++++ 6 files changed, 42 insertions(+) diff --git a/internal/manager/job_deleter/interfaces.go b/internal/manager/job_deleter/interfaces.go index febd35c9..ab0f1747 100644 --- a/internal/manager/job_deleter/interfaces.go +++ b/internal/manager/job_deleter/interfaces.go @@ -25,6 +25,8 @@ type PersistenceService interface { // FetchJobsDeletionRequested returns the UUIDs of to-be-deleted jobs. FetchJobsDeletionRequested(ctx context.Context) ([]string, error) DeleteJob(ctx context.Context, jobUUID string) error + + RequestIntegrityCheck() } // PersistenceService should be a subset of persistence.DB diff --git a/internal/manager/job_deleter/job_deleter.go b/internal/manager/job_deleter/job_deleter.go index 09194b70..95fe7589 100644 --- a/internal/manager/job_deleter/job_deleter.go +++ b/internal/manager/job_deleter/job_deleter.go @@ -225,6 +225,11 @@ func (s *Service) deleteJob(ctx context.Context, jobUUID string) error { s.changeBroadcaster.BroadcastJobUpdate(jobUpdate) logger.Info().Msg("job deleter: job removal complete") + + // Request a consistency check on the database. In the past there have been + // some issues after deleting a job. + s.persist.RequestIntegrityCheck() + return nil } diff --git a/internal/manager/job_deleter/job_deleter_test.go b/internal/manager/job_deleter/job_deleter_test.go index 6ff70741..770c45a8 100644 --- a/internal/manager/job_deleter/job_deleter_test.go +++ b/internal/manager/job_deleter/job_deleter_test.go @@ -109,6 +109,7 @@ func TestDeleteJobWithoutShaman(t *testing.T) { // Mock that everything went OK. mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID) mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID) + mocks.persist.EXPECT().RequestIntegrityCheck() mocks.broadcaster.EXPECT().BroadcastJobUpdate(gomock.Any()) assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID)) } @@ -160,6 +161,7 @@ func TestDeleteJobWithShaman(t *testing.T) { mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID) mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID) mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID) + mocks.persist.EXPECT().RequestIntegrityCheck() mocks.broadcaster.EXPECT().BroadcastJobUpdate(gomock.Any()) assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID)) } diff --git a/internal/manager/job_deleter/mocks/interfaces_mock.gen.go b/internal/manager/job_deleter/mocks/interfaces_mock.gen.go index a4924d93..c385faff 100644 --- a/internal/manager/job_deleter/mocks/interfaces_mock.gen.go +++ b/internal/manager/job_deleter/mocks/interfaces_mock.gen.go @@ -81,6 +81,18 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobsDeletionRequested(arg0 in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsDeletionRequested", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsDeletionRequested), arg0) } +// RequestIntegrityCheck mocks base method. +func (m *MockPersistenceService) RequestIntegrityCheck() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RequestIntegrityCheck") +} + +// RequestIntegrityCheck indicates an expected call of RequestIntegrityCheck. +func (mr *MockPersistenceServiceMockRecorder) RequestIntegrityCheck() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestIntegrityCheck", reflect.TypeOf((*MockPersistenceService)(nil).RequestIntegrityCheck)) +} + // RequestJobDeletion mocks base method. func (m *MockPersistenceService) RequestJobDeletion(arg0 context.Context, arg1 *persistence.Job) error { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index 56a49cc2..78e222a0 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -18,6 +18,9 @@ import ( // DB provides the database interface. type DB struct { gormDB *gorm.DB + + // See PeriodicIntegrityCheck(). + consistencyCheckRequests chan struct{} } // Model contains the common database fields for most model structs. @@ -96,6 +99,11 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) { db := DB{ gormDB: gormDB, + + // Buffer one request, so that even when a consistency check is already + // running, another can be queued without blocking. Queueing more than one + // doesn't make sense, though. + consistencyCheckRequests: make(chan struct{}, 1), } // Close the database connection if there was some error. This prevents diff --git a/internal/manager/persistence/integrity.go b/internal/manager/persistence/integrity.go index 2a87eb4d..dad10501 100644 --- a/internal/manager/persistence/integrity.go +++ b/internal/manager/persistence/integrity.go @@ -49,6 +49,7 @@ func (db *DB) PeriodicIntegrityCheck( case <-ctx.Done(): return case <-time.After(period): + case <-db.consistencyCheckRequests: } ok := db.performIntegrityCheck(ctx) @@ -59,12 +60,24 @@ func (db *DB) PeriodicIntegrityCheck( } } +// RequestIntegrityCheck triggers a check of the database persistency. +func (db *DB) RequestIntegrityCheck() { + select { + case db.consistencyCheckRequests <- struct{}{}: + // Don't do anything, the work is done. + default: + log.Debug().Msg("database: could not trigger integrity check, another check might already be queued.") + } +} + // performIntegrityCheck uses a few 'pragma' SQL statements to do some integrity checking. // Returns true on OK, false if there was an issue. Issues are always logged. func (db *DB) performIntegrityCheck(ctx context.Context) (ok bool) { checkCtx, cancel := context.WithTimeout(ctx, integrityCheckTimeout) defer cancel() + log.Debug().Msg("database: performing integrity check") + if !db.pragmaIntegrityCheck(checkCtx) { return false }