diff --git a/internal/manager/job_deleter/interfaces.go b/internal/manager/job_deleter/interfaces.go index 42b9b0d9..ba4582c2 100644 --- a/internal/manager/job_deleter/interfaces.go +++ b/internal/manager/job_deleter/interfaces.go @@ -18,6 +18,7 @@ import ( type PersistenceService interface { FetchJob(ctx context.Context, jobUUID string) (*persistence.Job, error) + FetchJobShamanCheckoutID(ctx context.Context, jobUUID string) (string, error) RequestJobDeletion(ctx context.Context, j *persistence.Job) error RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error) diff --git a/internal/manager/job_deleter/job_deleter.go b/internal/manager/job_deleter/job_deleter.go index 8ac9e719..5da0d28e 100644 --- a/internal/manager/job_deleter/job_deleter.go +++ b/internal/manager/job_deleter/job_deleter.go @@ -197,6 +197,7 @@ queueLoop: func (s *Service) deleteJob(ctx context.Context, jobUUID string) error { logger := log.With().Str("job", jobUUID).Logger() + logger.Debug().Msg("job deleter: starting job deletion") err := s.deleteShamanCheckout(ctx, logger, jobUUID) if err != nil { return err @@ -258,12 +259,10 @@ func (s *Service) deleteShamanCheckout(ctx context.Context, logger zerolog.Logge } // To erase the Shaman checkout we need more info than just its UUID. - dbJob, err := s.persist.FetchJob(ctx, jobUUID) + checkoutID, err := s.persist.FetchJobShamanCheckoutID(ctx, jobUUID) if err != nil { return fmt.Errorf("unable to fetch job from database: %w", err) } - - checkoutID := dbJob.Storage.ShamanCheckoutID if checkoutID == "" { logger.Info().Msg("job deleter: job was not created with Shaman (or before Flamenco v3.2), skipping job file deletion") return nil @@ -272,10 +271,10 @@ func (s *Service) deleteShamanCheckout(ctx context.Context, logger zerolog.Logge err = s.shaman.EraseCheckout(checkoutID) switch { case errors.Is(err, shaman.ErrDoesNotExist): - logger.Info().Msg("job deleter: Shaman checkout directory does not exist, ignoring") + logger.Debug().Msg("job deleter: Shaman checkout directory does not exist, ignoring") return nil case err != nil: - logger.Info().Err(err).Msg("job deleter: Shaman checkout directory could not be erased") + logger.Warn().Err(err).Msg("job deleter: Shaman checkout directory could not be erased") return err } diff --git a/internal/manager/job_deleter/job_deleter_test.go b/internal/manager/job_deleter/job_deleter_test.go index c20d2c2d..6b347196 100644 --- a/internal/manager/job_deleter/job_deleter_test.go +++ b/internal/manager/job_deleter/job_deleter_test.go @@ -128,14 +128,7 @@ func TestDeleteJobWithShaman(t *testing.T) { AnyTimes() shamanCheckoutID := "010_0431_lighting" - dbJob := persistence.Job{ - UUID: jobUUID, - Name: "сцена/shot/010_0431_lighting", - Storage: persistence.JobStorageInfo{ - ShamanCheckoutID: shamanCheckoutID, - }, - } - mocks.persist.EXPECT().FetchJob(mocks.ctx, jobUUID).Return(&dbJob, nil).AnyTimes() + mocks.persist.EXPECT().FetchJobShamanCheckoutID(mocks.ctx, jobUUID).Return(shamanCheckoutID, nil).AnyTimes() // Mock that Shaman deletion failed. The rest of the deletion should be // blocked by this. diff --git a/internal/manager/job_deleter/mocks/interfaces_mock.gen.go b/internal/manager/job_deleter/mocks/interfaces_mock.gen.go index 7e623c75..c927c5b8 100644 --- a/internal/manager/job_deleter/mocks/interfaces_mock.gen.go +++ b/internal/manager/job_deleter/mocks/interfaces_mock.gen.go @@ -66,6 +66,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJob(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchJob), arg0, arg1) } +// FetchJobShamanCheckoutID mocks base method. +func (m *MockPersistenceService) FetchJobShamanCheckoutID(arg0 context.Context, arg1 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchJobShamanCheckoutID", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchJobShamanCheckoutID indicates an expected call of FetchJobShamanCheckoutID. +func (mr *MockPersistenceServiceMockRecorder) FetchJobShamanCheckoutID(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobShamanCheckoutID", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobShamanCheckoutID), arg0, arg1) +} + // FetchJobsDeletionRequested mocks base method. func (m *MockPersistenceService) FetchJobsDeletionRequested(arg0 context.Context) ([]string, error) { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 58bc1859..fc88d8fe 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -270,6 +270,23 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { return convertSqlcJob(sqlcJob) } +// FetchJobShamanCheckoutID fetches the job's Shaman Checkout ID. +func (db *DB) FetchJobShamanCheckoutID(ctx context.Context, jobUUID string) (string, error) { + queries, err := db.queries() + if err != nil { + return "", err + } + + checkoutID, err := queries.FetchJobShamanCheckoutID(ctx, jobUUID) + switch { + case errors.Is(err, sql.ErrNoRows): + return "", ErrJobNotFound + case err != nil: + return "", jobError(err, "fetching job") + } + return checkoutID, nil +} + // DeleteJob deletes a job from the database. // The deletion cascades to its tasks and other job-related tables. func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error { diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index af338f25..15843769 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -192,6 +192,29 @@ func TestDeleteJob(t *testing.T) { "all remaining tasks should belong to the other job") } +func TestFetchJobShamanCheckoutID(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) + defer cancel() + + authJob := createTestAuthoredJobWithTasks() + authJob.JobID = "e1a034cc-b709-45f5-b80f-9cf16511c678" + authJob.Name = "Job to delete" + authJob.Storage.ShamanCheckoutID = "some-✓out-id-string" + persistAuthoredJob(t, ctx, db, authJob) + + { // Test fetching a non-existing job. + checkoutID, err := db.FetchJobShamanCheckoutID(ctx, "4cb20f0d-f1f6-4d56-8277-9b208a99fed0") + assert.ErrorIs(t, err, ErrJobNotFound) + assert.Equal(t, "", checkoutID) + } + + { // Test existing job. + checkoutID, err := db.FetchJobShamanCheckoutID(ctx, authJob.JobID) + require.NoError(t, err) + assert.Equal(t, authJob.Storage.ShamanCheckoutID, checkoutID) + } +} + func TestDeleteJobWithoutFK(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) defer cancel() diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index b8d79770..be98e611 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -27,6 +27,9 @@ WHERE uuid = ? LIMIT 1; SELECT * FROM jobs WHERE id = ? LIMIT 1; +-- name: FetchJobShamanCheckoutID :one +SELECT storage_shaman_checkout_id FROM jobs WHERE uuid=@uuid; + -- name: DeleteJob :exec DELETE FROM jobs WHERE uuid = ?; diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index a5cef021..f8c2e08f 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -94,7 +94,6 @@ type CreateJobParams struct { } // Jobs / Tasks queries -// func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error { _, err := q.db.ExecContext(ctx, createJob, arg.CreatedAt, @@ -176,6 +175,17 @@ func (q *Queries) FetchJobByID(ctx context.Context, id int64) (Job, error) { return i, err } +const fetchJobShamanCheckoutID = `-- name: FetchJobShamanCheckoutID :one +SELECT storage_shaman_checkout_id FROM jobs WHERE uuid=?1 +` + +func (q *Queries) FetchJobShamanCheckoutID(ctx context.Context, uuid string) (string, error) { + row := q.db.QueryRowContext(ctx, fetchJobShamanCheckoutID, uuid) + var storage_shaman_checkout_id string + err := row.Scan(&storage_shaman_checkout_id) + return storage_shaman_checkout_id, err +} + const fetchJobUUIDsUpdatedBefore = `-- name: FetchJobUUIDsUpdatedBefore :many SELECT uuid FROM jobs WHERE updated_at <= ?1 ` @@ -323,7 +333,7 @@ func (q *Queries) FetchTask(ctx context.Context, uuid string) (FetchTaskRow, err const fetchTaskFailureList = `-- name: FetchTaskFailureList :many SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers -INNER JOIN task_failures TF on TF.worker_id = workers.id +INNER JOIN task_failures TF on TF.worker_id=workers.id WHERE TF.task_id=?1 `