Manager: after deleting a job, perform a database consistency check
Deleting jobs from the database can still sometimes cause consistency errors, as if foreign key constraints aren't enabled. This check is there to try and get a grip on things.
This commit is contained in:
parent
7c08ec8654
commit
b39f116b0e
@ -25,6 +25,8 @@ type PersistenceService interface {
|
|||||||
// FetchJobsDeletionRequested returns the UUIDs of to-be-deleted jobs.
|
// FetchJobsDeletionRequested returns the UUIDs of to-be-deleted jobs.
|
||||||
FetchJobsDeletionRequested(ctx context.Context) ([]string, error)
|
FetchJobsDeletionRequested(ctx context.Context) ([]string, error)
|
||||||
DeleteJob(ctx context.Context, jobUUID string) error
|
DeleteJob(ctx context.Context, jobUUID string) error
|
||||||
|
|
||||||
|
RequestIntegrityCheck()
|
||||||
}
|
}
|
||||||
|
|
||||||
// PersistenceService should be a subset of persistence.DB
|
// PersistenceService should be a subset of persistence.DB
|
||||||
|
@ -225,6 +225,11 @@ func (s *Service) deleteJob(ctx context.Context, jobUUID string) error {
|
|||||||
s.changeBroadcaster.BroadcastJobUpdate(jobUpdate)
|
s.changeBroadcaster.BroadcastJobUpdate(jobUpdate)
|
||||||
|
|
||||||
logger.Info().Msg("job deleter: job removal complete")
|
logger.Info().Msg("job deleter: job removal complete")
|
||||||
|
|
||||||
|
// Request a consistency check on the database. In the past there have been
|
||||||
|
// some issues after deleting a job.
|
||||||
|
s.persist.RequestIntegrityCheck()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,6 +109,7 @@ func TestDeleteJobWithoutShaman(t *testing.T) {
|
|||||||
// Mock that everything went OK.
|
// Mock that everything went OK.
|
||||||
mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID)
|
mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID)
|
||||||
mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID)
|
mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID)
|
||||||
|
mocks.persist.EXPECT().RequestIntegrityCheck()
|
||||||
mocks.broadcaster.EXPECT().BroadcastJobUpdate(gomock.Any())
|
mocks.broadcaster.EXPECT().BroadcastJobUpdate(gomock.Any())
|
||||||
assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID))
|
assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID))
|
||||||
}
|
}
|
||||||
@ -160,6 +161,7 @@ func TestDeleteJobWithShaman(t *testing.T) {
|
|||||||
mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID)
|
mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID)
|
||||||
mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID)
|
mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID)
|
||||||
mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID)
|
mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID)
|
||||||
|
mocks.persist.EXPECT().RequestIntegrityCheck()
|
||||||
mocks.broadcaster.EXPECT().BroadcastJobUpdate(gomock.Any())
|
mocks.broadcaster.EXPECT().BroadcastJobUpdate(gomock.Any())
|
||||||
assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID))
|
assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID))
|
||||||
}
|
}
|
||||||
|
@ -81,6 +81,18 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobsDeletionRequested(arg0 in
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsDeletionRequested", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsDeletionRequested), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsDeletionRequested", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsDeletionRequested), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RequestIntegrityCheck mocks base method.
|
||||||
|
func (m *MockPersistenceService) RequestIntegrityCheck() {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "RequestIntegrityCheck")
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestIntegrityCheck indicates an expected call of RequestIntegrityCheck.
|
||||||
|
func (mr *MockPersistenceServiceMockRecorder) RequestIntegrityCheck() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestIntegrityCheck", reflect.TypeOf((*MockPersistenceService)(nil).RequestIntegrityCheck))
|
||||||
|
}
|
||||||
|
|
||||||
// RequestJobDeletion mocks base method.
|
// RequestJobDeletion mocks base method.
|
||||||
func (m *MockPersistenceService) RequestJobDeletion(arg0 context.Context, arg1 *persistence.Job) error {
|
func (m *MockPersistenceService) RequestJobDeletion(arg0 context.Context, arg1 *persistence.Job) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
@ -18,6 +18,9 @@ import (
|
|||||||
// DB provides the database interface.
|
// DB provides the database interface.
|
||||||
type DB struct {
|
type DB struct {
|
||||||
gormDB *gorm.DB
|
gormDB *gorm.DB
|
||||||
|
|
||||||
|
// See PeriodicIntegrityCheck().
|
||||||
|
consistencyCheckRequests chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Model contains the common database fields for most model structs.
|
// Model contains the common database fields for most model structs.
|
||||||
@ -96,6 +99,11 @@ func openDBWithConfig(dsn string, config *gorm.Config) (*DB, error) {
|
|||||||
|
|
||||||
db := DB{
|
db := DB{
|
||||||
gormDB: gormDB,
|
gormDB: gormDB,
|
||||||
|
|
||||||
|
// Buffer one request, so that even when a consistency check is already
|
||||||
|
// running, another can be queued without blocking. Queueing more than one
|
||||||
|
// doesn't make sense, though.
|
||||||
|
consistencyCheckRequests: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the database connection if there was some error. This prevents
|
// Close the database connection if there was some error. This prevents
|
||||||
|
@ -49,6 +49,7 @@ func (db *DB) PeriodicIntegrityCheck(
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-time.After(period):
|
case <-time.After(period):
|
||||||
|
case <-db.consistencyCheckRequests:
|
||||||
}
|
}
|
||||||
|
|
||||||
ok := db.performIntegrityCheck(ctx)
|
ok := db.performIntegrityCheck(ctx)
|
||||||
@ -59,12 +60,24 @@ func (db *DB) PeriodicIntegrityCheck(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RequestIntegrityCheck triggers a check of the database persistency.
|
||||||
|
func (db *DB) RequestIntegrityCheck() {
|
||||||
|
select {
|
||||||
|
case db.consistencyCheckRequests <- struct{}{}:
|
||||||
|
// Don't do anything, the work is done.
|
||||||
|
default:
|
||||||
|
log.Debug().Msg("database: could not trigger integrity check, another check might already be queued.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// performIntegrityCheck uses a few 'pragma' SQL statements to do some integrity checking.
|
// performIntegrityCheck uses a few 'pragma' SQL statements to do some integrity checking.
|
||||||
// Returns true on OK, false if there was an issue. Issues are always logged.
|
// Returns true on OK, false if there was an issue. Issues are always logged.
|
||||||
func (db *DB) performIntegrityCheck(ctx context.Context) (ok bool) {
|
func (db *DB) performIntegrityCheck(ctx context.Context) (ok bool) {
|
||||||
checkCtx, cancel := context.WithTimeout(ctx, integrityCheckTimeout)
|
checkCtx, cancel := context.WithTimeout(ctx, integrityCheckTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
log.Debug().Msg("database: performing integrity check")
|
||||||
|
|
||||||
if !db.pragmaIntegrityCheck(checkCtx) {
|
if !db.pragmaIntegrityCheck(checkCtx) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user