Manager: lightly polish job deletion

Tweak the logging a little bit so it's less noisy, properly warns when the
Shaman checkout dir cannot be removed, and optimise the database query
a bit (by just fetching the one field that's needed, instead of the entire
job).

Deletion still works the same.
This commit is contained in:
Sybren A. Stüvel 2024-05-28 12:08:54 +02:00
parent 79076be91b
commit 98cbe6a67d
8 changed files with 76 additions and 15 deletions

View File

@ -18,6 +18,7 @@ import (
type PersistenceService interface { type PersistenceService interface {
FetchJob(ctx context.Context, jobUUID string) (*persistence.Job, error) FetchJob(ctx context.Context, jobUUID string) (*persistence.Job, error)
FetchJobShamanCheckoutID(ctx context.Context, jobUUID string) (string, error)
RequestJobDeletion(ctx context.Context, j *persistence.Job) error RequestJobDeletion(ctx context.Context, j *persistence.Job) error
RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error) RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error)

View File

@ -197,6 +197,7 @@ queueLoop:
func (s *Service) deleteJob(ctx context.Context, jobUUID string) error { func (s *Service) deleteJob(ctx context.Context, jobUUID string) error {
logger := log.With().Str("job", jobUUID).Logger() logger := log.With().Str("job", jobUUID).Logger()
logger.Debug().Msg("job deleter: starting job deletion")
err := s.deleteShamanCheckout(ctx, logger, jobUUID) err := s.deleteShamanCheckout(ctx, logger, jobUUID)
if err != nil { if err != nil {
return err return err
@ -258,12 +259,10 @@ func (s *Service) deleteShamanCheckout(ctx context.Context, logger zerolog.Logge
} }
// To erase the Shaman checkout we need more info than just its UUID. // To erase the Shaman checkout we need more info than just its UUID.
dbJob, err := s.persist.FetchJob(ctx, jobUUID) checkoutID, err := s.persist.FetchJobShamanCheckoutID(ctx, jobUUID)
if err != nil { if err != nil {
return fmt.Errorf("unable to fetch job from database: %w", err) return fmt.Errorf("unable to fetch job from database: %w", err)
} }
checkoutID := dbJob.Storage.ShamanCheckoutID
if checkoutID == "" { if checkoutID == "" {
logger.Info().Msg("job deleter: job was not created with Shaman (or before Flamenco v3.2), skipping job file deletion") logger.Info().Msg("job deleter: job was not created with Shaman (or before Flamenco v3.2), skipping job file deletion")
return nil return nil
@ -272,10 +271,10 @@ func (s *Service) deleteShamanCheckout(ctx context.Context, logger zerolog.Logge
err = s.shaman.EraseCheckout(checkoutID) err = s.shaman.EraseCheckout(checkoutID)
switch { switch {
case errors.Is(err, shaman.ErrDoesNotExist): case errors.Is(err, shaman.ErrDoesNotExist):
logger.Info().Msg("job deleter: Shaman checkout directory does not exist, ignoring") logger.Debug().Msg("job deleter: Shaman checkout directory does not exist, ignoring")
return nil return nil
case err != nil: case err != nil:
logger.Info().Err(err).Msg("job deleter: Shaman checkout directory could not be erased") logger.Warn().Err(err).Msg("job deleter: Shaman checkout directory could not be erased")
return err return err
} }

View File

@ -128,14 +128,7 @@ func TestDeleteJobWithShaman(t *testing.T) {
AnyTimes() AnyTimes()
shamanCheckoutID := "010_0431_lighting" shamanCheckoutID := "010_0431_lighting"
dbJob := persistence.Job{ mocks.persist.EXPECT().FetchJobShamanCheckoutID(mocks.ctx, jobUUID).Return(shamanCheckoutID, nil).AnyTimes()
UUID: jobUUID,
Name: "сцена/shot/010_0431_lighting",
Storage: persistence.JobStorageInfo{
ShamanCheckoutID: shamanCheckoutID,
},
}
mocks.persist.EXPECT().FetchJob(mocks.ctx, jobUUID).Return(&dbJob, nil).AnyTimes()
// Mock that Shaman deletion failed. The rest of the deletion should be // Mock that Shaman deletion failed. The rest of the deletion should be
// blocked by this. // blocked by this.

View File

@ -66,6 +66,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJob(arg0, arg1 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchJob), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchJob), arg0, arg1)
} }
// FetchJobShamanCheckoutID mocks base method.
func (m *MockPersistenceService) FetchJobShamanCheckoutID(arg0 context.Context, arg1 string) (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchJobShamanCheckoutID", arg0, arg1)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchJobShamanCheckoutID indicates an expected call of FetchJobShamanCheckoutID.
func (mr *MockPersistenceServiceMockRecorder) FetchJobShamanCheckoutID(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobShamanCheckoutID", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobShamanCheckoutID), arg0, arg1)
}
// FetchJobsDeletionRequested mocks base method. // FetchJobsDeletionRequested mocks base method.
func (m *MockPersistenceService) FetchJobsDeletionRequested(arg0 context.Context) ([]string, error) { func (m *MockPersistenceService) FetchJobsDeletionRequested(arg0 context.Context) ([]string, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -270,6 +270,23 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
return convertSqlcJob(sqlcJob) return convertSqlcJob(sqlcJob)
} }
// FetchJobShamanCheckoutID fetches the job's Shaman Checkout ID.
func (db *DB) FetchJobShamanCheckoutID(ctx context.Context, jobUUID string) (string, error) {
queries, err := db.queries()
if err != nil {
return "", err
}
checkoutID, err := queries.FetchJobShamanCheckoutID(ctx, jobUUID)
switch {
case errors.Is(err, sql.ErrNoRows):
return "", ErrJobNotFound
case err != nil:
return "", jobError(err, "fetching job")
}
return checkoutID, nil
}
// DeleteJob deletes a job from the database. // DeleteJob deletes a job from the database.
// The deletion cascades to its tasks and other job-related tables. // The deletion cascades to its tasks and other job-related tables.
func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error { func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error {

View File

@ -192,6 +192,29 @@ func TestDeleteJob(t *testing.T) {
"all remaining tasks should belong to the other job") "all remaining tasks should belong to the other job")
} }
func TestFetchJobShamanCheckoutID(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second)
defer cancel()
authJob := createTestAuthoredJobWithTasks()
authJob.JobID = "e1a034cc-b709-45f5-b80f-9cf16511c678"
authJob.Name = "Job to delete"
authJob.Storage.ShamanCheckoutID = "some-✓out-id-string"
persistAuthoredJob(t, ctx, db, authJob)
{ // Test fetching a non-existing job.
checkoutID, err := db.FetchJobShamanCheckoutID(ctx, "4cb20f0d-f1f6-4d56-8277-9b208a99fed0")
assert.ErrorIs(t, err, ErrJobNotFound)
assert.Equal(t, "", checkoutID)
}
{ // Test existing job.
checkoutID, err := db.FetchJobShamanCheckoutID(ctx, authJob.JobID)
require.NoError(t, err)
assert.Equal(t, authJob.Storage.ShamanCheckoutID, checkoutID)
}
}
func TestDeleteJobWithoutFK(t *testing.T) { func TestDeleteJobWithoutFK(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second)
defer cancel() defer cancel()

View File

@ -27,6 +27,9 @@ WHERE uuid = ? LIMIT 1;
SELECT * FROM jobs SELECT * FROM jobs
WHERE id = ? LIMIT 1; WHERE id = ? LIMIT 1;
-- name: FetchJobShamanCheckoutID :one
SELECT storage_shaman_checkout_id FROM jobs WHERE uuid=@uuid;
-- name: DeleteJob :exec -- name: DeleteJob :exec
DELETE FROM jobs WHERE uuid = ?; DELETE FROM jobs WHERE uuid = ?;

View File

@ -94,7 +94,6 @@ type CreateJobParams struct {
} }
// Jobs / Tasks queries // Jobs / Tasks queries
//
func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error { func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error {
_, err := q.db.ExecContext(ctx, createJob, _, err := q.db.ExecContext(ctx, createJob,
arg.CreatedAt, arg.CreatedAt,
@ -176,6 +175,17 @@ func (q *Queries) FetchJobByID(ctx context.Context, id int64) (Job, error) {
return i, err return i, err
} }
const fetchJobShamanCheckoutID = `-- name: FetchJobShamanCheckoutID :one
SELECT storage_shaman_checkout_id FROM jobs WHERE uuid=?1
`
func (q *Queries) FetchJobShamanCheckoutID(ctx context.Context, uuid string) (string, error) {
row := q.db.QueryRowContext(ctx, fetchJobShamanCheckoutID, uuid)
var storage_shaman_checkout_id string
err := row.Scan(&storage_shaman_checkout_id)
return storage_shaman_checkout_id, err
}
const fetchJobUUIDsUpdatedBefore = `-- name: FetchJobUUIDsUpdatedBefore :many const fetchJobUUIDsUpdatedBefore = `-- name: FetchJobUUIDsUpdatedBefore :many
SELECT uuid FROM jobs WHERE updated_at <= ?1 SELECT uuid FROM jobs WHERE updated_at <= ?1
` `
@ -323,7 +333,7 @@ func (q *Queries) FetchTask(ctx context.Context, uuid string) (FetchTaskRow, err
const fetchTaskFailureList = `-- name: FetchTaskFailureList :many const fetchTaskFailureList = `-- name: FetchTaskFailureList :many
SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers
INNER JOIN task_failures TF on TF.worker_id = workers.id INNER JOIN task_failures TF on TF.worker_id=workers.id
WHERE TF.task_id=?1 WHERE TF.task_id=?1
` `