diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 0b1f9e91..593e8244 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -234,6 +234,7 @@ var _ WorkerSleepScheduler = (*sleep_scheduler.SleepScheduler)(nil) type JobDeleter interface { QueueJobDeletion(ctx context.Context, job *persistence.Job) error + QueueMassJobDeletion(ctx context.Context, lastUpdatedMax time.Time) error WhatWouldBeDeleted(job *persistence.Job) api.JobDeletionInfo } diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 390cf845..e900f670 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -11,6 +11,7 @@ import ( "os" "path" "runtime" + "time" "github.com/labstack/echo/v4" "github.com/rs/zerolog" @@ -214,6 +215,68 @@ func (f *Flamenco) DeleteJobWhatWouldItDo(e echo.Context, jobID string) error { return e.JSON(http.StatusOK, deletionInfo) } +func timestampRoundUp(stamp time.Time) time.Time { + truncated := stamp.Truncate(time.Second) + if truncated == stamp { + return stamp + } + return truncated.Add(time.Second) +} + +func (f *Flamenco) DeleteJobMass(e echo.Context) error { + logger := requestLogger(e) + + var settings api.DeleteJobMassJSONBody + if err := e.Bind(&settings); err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + + if settings.LastUpdatedMax == nil { + // This is the only parameter, so if this is missing, we can't do anything. + // The parameter is optional in order to make space for future extensions. + logger.Warn().Msg("bad request received, no 'last_updated_max' field") + return sendAPIError(e, http.StatusBadRequest, "invalid format (no last_updated_max)") + } + + // Round the time up to entire seconds. This makes it possible to take an + // 'updated at' timestamp from an existing job, and delete that job + all + // older ones. + // + // There might be precision differences between time representation in various + // languages. When the to-be-deleted job has an 'updated at' timestamp at time + // 13:14:15.100, it could get truncated to 13:14:15, which is before the + // to-be-deleted job. + // + // Rounding the given timestamp up to entire seconds solves this, even though + // it might delete too many jobs. + lastUpdatedMax := timestampRoundUp(*settings.LastUpdatedMax) + + logger = logger.With(). + Time("lastUpdatedMax", lastUpdatedMax). + Logger() + logger.Info().Msg("mass deletion of jobs reqeuested") + + // All the required info is known, this can keep running even when the client + // disconnects. + ctx := context.Background() + err := f.jobDeleter.QueueMassJobDeletion(ctx, lastUpdatedMax.UTC()) + + switch { + case persistence.ErrIsDBBusy(err): + logger.Error().AnErr("cause", err).Msg("database too busy to queue job deletion") + return sendAPIErrorDBBusy(e, "too busy to queue job deletion, try again later") + case errors.Is(err, persistence.ErrJobNotFound): + logger.Warn().Msg("mass job deletion: cannot find jobs modified before timestamp") + return sendAPIError(e, http.StatusRequestedRangeNotSatisfiable, "no jobs modified before timestamp") + case err != nil: + logger.Error().AnErr("cause", err).Msg("error queueing job deletion") + return sendAPIError(e, http.StatusInternalServerError, "error queueing job deletion") + default: + return e.NoContent(http.StatusNoContent) + } +} + // SetJobStatus is used by the web interface to change a job's status. func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error { logger := requestLogger(e).With(). diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index a38868ff..e08f7fff 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -930,3 +931,54 @@ func TestDeleteJob(t *testing.T) { assertResponseNoContent(t, echoCtx) } + +func TestDeleteJobMass(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + withFracionalSecs, err := time.Parse(time.RFC3339Nano, "2023-12-01T09:17:34.275+02:00") + require.NoError(t, err) + + roundedUp, err := time.Parse(time.RFC3339Nano, "2023-12-01T09:17:35+02:00") + require.NoError(t, err) + + body := api.DeleteJobMassJSONBody{ + LastUpdatedMax: &withFracionalSecs, + } + + { // Happy flow. + echoCtx := mf.prepareMockedJSONRequest(body) + mf.jobDeleter.EXPECT().QueueMassJobDeletion(gomock.Any(), roundedUp.UTC()) + + err := mf.flamenco.DeleteJobMass(echoCtx) + require.NoError(t, err) + + assertResponseNoContent(t, echoCtx) + } + + { // No jobs found. + echoCtx := mf.prepareMockedJSONRequest(body) + mf.jobDeleter.EXPECT().QueueMassJobDeletion(gomock.Any(), roundedUp.UTC()). + Return(persistence.ErrJobNotFound) + + err := mf.flamenco.DeleteJobMass(echoCtx) + require.NoError(t, err) + + assertResponseAPIError(t, echoCtx, + http.StatusRequestedRangeNotSatisfiable, + "no jobs modified before timestamp") + } +} + +func TestTimestampRoundUp(t *testing.T) { + withFracionalSecs, err := time.Parse(time.RFC3339Nano, "2023-12-01T09:17:34.275+00:00") + require.NoError(t, err) + + roundedUp, err := time.Parse(time.RFC3339Nano, "2023-12-01T09:17:35+00:00") + require.NoError(t, err) + + assert.Equal(t, roundedUp, timestampRoundUp(withFracionalSecs)) + assert.Equal(t, roundedUp, timestampRoundUp(roundedUp)) +} diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 92c58264..d4b7506f 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -8,6 +8,7 @@ import ( context "context" io "io" reflect "reflect" + time "time" gomock "github.com/golang/mock/gomock" zerolog "github.com/rs/zerolog" @@ -1385,6 +1386,20 @@ func (mr *MockJobDeleterMockRecorder) QueueJobDeletion(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueJobDeletion", reflect.TypeOf((*MockJobDeleter)(nil).QueueJobDeletion), arg0, arg1) } +// QueueMassJobDeletion mocks base method. +func (m *MockJobDeleter) QueueMassJobDeletion(arg0 context.Context, arg1 time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueueMassJobDeletion", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// QueueMassJobDeletion indicates an expected call of QueueMassJobDeletion. +func (mr *MockJobDeleterMockRecorder) QueueMassJobDeletion(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueMassJobDeletion", reflect.TypeOf((*MockJobDeleter)(nil).QueueMassJobDeletion), arg0, arg1) +} + // WhatWouldBeDeleted mocks base method. func (m *MockJobDeleter) WhatWouldBeDeleted(arg0 *persistence.Job) api.JobDeletionInfo { m.ctrl.T.Helper() diff --git a/internal/manager/job_deleter/interfaces.go b/internal/manager/job_deleter/interfaces.go index d1cd9006..febd35c9 100644 --- a/internal/manager/job_deleter/interfaces.go +++ b/internal/manager/job_deleter/interfaces.go @@ -4,6 +4,7 @@ package job_deleter import ( "context" + "time" "projects.blender.org/studio/flamenco/internal/manager/local_storage" "projects.blender.org/studio/flamenco/internal/manager/persistence" @@ -19,6 +20,8 @@ type PersistenceService interface { FetchJob(ctx context.Context, jobUUID string) (*persistence.Job, error) RequestJobDeletion(ctx context.Context, j *persistence.Job) error + RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error) + // FetchJobsDeletionRequested returns the UUIDs of to-be-deleted jobs. FetchJobsDeletionRequested(ctx context.Context) ([]string, error) DeleteJob(ctx context.Context, jobUUID string) error diff --git a/internal/manager/job_deleter/job_deleter.go b/internal/manager/job_deleter/job_deleter.go index 6ed21427..09194b70 100644 --- a/internal/manager/job_deleter/job_deleter.go +++ b/internal/manager/job_deleter/job_deleter.go @@ -89,6 +89,50 @@ func (s *Service) QueueJobDeletion(ctx context.Context, job *persistence.Job) er return nil } +func (s *Service) QueueMassJobDeletion(ctx context.Context, lastUpdatedMax time.Time) error { + logger := log.With().Time("lastUpdatedMax", lastUpdatedMax).Logger() + + uuids, err := s.persist.RequestJobMassDeletion(ctx, lastUpdatedMax) + if err != nil { + return fmt.Errorf("requesting mass job deletion: %w", err) + } + + logger.Info(). + Int("numjobs", len(uuids)). + Msg("job deleter: queueing multiple jobs for deletion") + + // Do the poking of the job deleter, and broadcasting of the job deletion, in + // the background. The main work is done, and the rest can be done asynchronously. + bgCtx := context.Background() + go s.broadcastAndQueueMassJobDeletion(bgCtx, uuids, logger) + + return nil +} + +func (s *Service) broadcastAndQueueMassJobDeletion(ctx context.Context, jobUUIDs []string, logger zerolog.Logger) { + for _, uuid := range jobUUIDs { + // Let the Run() goroutine know this job is ready for deletion. + select { + case s.queue <- uuid: + logger.Debug().Msg("job deleter: job succesfully queued for deletion") + case <-time.After(100 * time.Millisecond): + logger.Trace().Msg("job deleter: job deletion queue is full") + } + + // Broadcast that the jobs were queued for deletion. + job, err := s.persist.FetchJob(ctx, uuid) + if err != nil { + logger.Debug(). + Str("uuid", uuid). + Err(err). + Msg("job deleter: unable to fetch job to send updates") + continue + } + jobUpdate := webupdates.NewJobUpdate(job) + s.changeBroadcaster.BroadcastJobUpdate(jobUpdate) + } +} + func (s *Service) WhatWouldBeDeleted(job *persistence.Job) api.JobDeletionInfo { logger := log.With().Str("job", job.UUID).Logger() logger.Info().Msg("job deleter: checking what job deletion would do") diff --git a/internal/manager/job_deleter/mocks/interfaces_mock.gen.go b/internal/manager/job_deleter/mocks/interfaces_mock.gen.go index c938d5c1..a4924d93 100644 --- a/internal/manager/job_deleter/mocks/interfaces_mock.gen.go +++ b/internal/manager/job_deleter/mocks/interfaces_mock.gen.go @@ -7,6 +7,7 @@ package mocks import ( context "context" reflect "reflect" + time "time" gomock "github.com/golang/mock/gomock" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" @@ -94,6 +95,21 @@ func (mr *MockPersistenceServiceMockRecorder) RequestJobDeletion(arg0, arg1 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestJobDeletion", reflect.TypeOf((*MockPersistenceService)(nil).RequestJobDeletion), arg0, arg1) } +// RequestJobMassDeletion mocks base method. +func (m *MockPersistenceService) RequestJobMassDeletion(arg0 context.Context, arg1 time.Time) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequestJobMassDeletion", arg0, arg1) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RequestJobMassDeletion indicates an expected call of RequestJobMassDeletion. +func (mr *MockPersistenceServiceMockRecorder) RequestJobMassDeletion(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestJobMassDeletion", reflect.TypeOf((*MockPersistenceService)(nil).RequestJobMassDeletion), arg0, arg1) +} + // MockStorage is a mock of Storage interface. type MockStorage struct { ctrl *gomock.Controller diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index a9b27b35..3772b280 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -286,11 +286,52 @@ func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error { Model(j). Updates(Job{DeleteRequestedAt: j.DeleteRequestedAt}) if tx.Error != nil { - return jobError(tx.Error, "deleting job") + return jobError(tx.Error, "queueing job for deletion") } return nil } +// RequestJobMassDeletion sets multiple job's "DeletionRequestedAt" field to "now". +// The list of affected job UUIDs is returned. +func (db *DB) RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error) { + // In order to be able to report which jobs were affected, first fetch the + // list of jobs, then update them. + var jobs []*Job + selectResult := db.gormDB.WithContext(ctx). + Model(&Job{}). + Select("uuid"). + Where("updated_at <= ?", lastUpdatedMax). + Scan(&jobs) + if selectResult.Error != nil { + return nil, jobError(selectResult.Error, "fetching jobs by last-modified timestamp") + } + + if len(jobs) == 0 { + return nil, ErrJobNotFound + } + + // Convert array of jobs to array of UUIDs. + uuids := make([]string, len(jobs)) + for index := range jobs { + uuids[index] = jobs[index].UUID + } + + // Update the selected jobs. + deleteRequestedAt := sql.NullTime{ + Time: db.gormDB.NowFunc(), + Valid: true, + } + tx := db.gormDB.WithContext(ctx). + Model(Job{}). + Where("uuid in ?", uuids). + Updates(Job{DeleteRequestedAt: deleteRequestedAt}) + if tx.Error != nil { + return nil, jobError(tx.Error, "queueing jobs for deletion") + } + + return uuids, nil +} + func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) { var jobs []*Job diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index afb8171a..2495715d 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -185,6 +185,74 @@ func TestRequestJobDeletion(t *testing.T) { assert.False(t, dbJob2.DeleteRequestedAt.Valid) } +func TestRequestJobMassDeletion(t *testing.T) { + // This is a fresh job, that shouldn't be touched by the mass deletion. + ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t) + defer close() + + origGormNow := db.gormDB.NowFunc + now := db.gormDB.NowFunc() + + // Ensure different jobs get different timestamps. + db.gormDB.NowFunc = func() time.Time { return now.Add(-3 * time.Second) } + authoredJob2 := duplicateJobAndTasks(authoredJob1) + job2 := persistAuthoredJob(t, ctx, db, authoredJob2) + + db.gormDB.NowFunc = func() time.Time { return now.Add(-4 * time.Second) } + authoredJob3 := duplicateJobAndTasks(authoredJob1) + job3 := persistAuthoredJob(t, ctx, db, authoredJob3) + + db.gormDB.NowFunc = func() time.Time { return now.Add(-5 * time.Second) } + authoredJob4 := duplicateJobAndTasks(authoredJob1) + job4 := persistAuthoredJob(t, ctx, db, authoredJob4) + + // Request that "job3 and older" gets deleted. + timeOfDeleteRequest := origGormNow() + db.gormDB.NowFunc = func() time.Time { return timeOfDeleteRequest } + uuids, err := db.RequestJobMassDeletion(ctx, job3.UpdatedAt) + assert.NoError(t, err) + + db.gormDB.NowFunc = origGormNow + + // Only jobs 3 and 4 should be updated. + assert.Equal(t, []string{job3.UUID, job4.UUID}, uuids) + + // All the jobs should still exist. + job1, err = db.FetchJob(ctx, job1.UUID) + require.NoError(t, err) + job2, err = db.FetchJob(ctx, job2.UUID) + require.NoError(t, err) + job3, err = db.FetchJob(ctx, job3.UUID) + require.NoError(t, err) + job4, err = db.FetchJob(ctx, job4.UUID) + require.NoError(t, err) + + // Jobs 3 and 4 should have been marked for deletion, the rest should be untouched. + assert.False(t, job1.DeleteRequested()) + assert.False(t, job2.DeleteRequested()) + assert.True(t, job3.DeleteRequested()) + assert.True(t, job4.DeleteRequested()) + + assert.Equal(t, timeOfDeleteRequest, job3.DeleteRequestedAt.Time) + assert.Equal(t, timeOfDeleteRequest, job4.DeleteRequestedAt.Time) +} + +func TestRequestJobMassDeletion_noJobsFound(t *testing.T) { + ctx, close, db, job, _ := jobTasksTestFixtures(t) + defer close() + + // Request deletion with a timestamp that doesn't match any jobs. + now := db.gormDB.NowFunc() + uuids, err := db.RequestJobMassDeletion(ctx, now.Add(-24*time.Hour)) + assert.ErrorIs(t, err, ErrJobNotFound) + assert.Zero(t, uuids) + + // The job shouldn't have been touched. + job, err = db.FetchJob(ctx, job.UUID) + require.NoError(t, err) + assert.False(t, job.DeleteRequested()) +} + func TestFetchJobsDeletionRequested(t *testing.T) { ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t) defer close()