diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 342d536f..8c3cfb91 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -27,6 +27,7 @@ import ( "git.blender.org/flamenco/internal/manager/api_impl/dummy" "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/job_compilers" + "git.blender.org/flamenco/internal/manager/job_deleter" "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/local_storage" "git.blender.org/flamenco/internal/manager/persistence" @@ -144,6 +145,11 @@ func runFlamencoManager() bool { // go persist.PeriodicMaintenanceLoop(mainCtx) timeService := clock.New() + compiler, err := job_compilers.Load(timeService) + if err != nil { + log.Fatal().Err(err).Msg("error loading job compilers") + } + webUpdater := webupdates.New() localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath) @@ -154,8 +160,13 @@ func runFlamencoManager() bool { lastRender := last_rendered.New(localStorage) shamanServer := buildShamanServer(configService, isFirstRun) - flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, - shamanServer, logStorage, webUpdater, lastRender, localStorage, sleepScheduler) + jobDeleter := job_deleter.NewService(persist, localStorage, webUpdater, shamanServer) + + flamenco := api_impl.NewFlamenco( + compiler, persist, webUpdater, logStorage, configService, + taskStateMachine, shamanServer, timeService, lastRender, + localStorage, sleepScheduler, jobDeleter) + e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage) timeoutChecker := timeout_checker.New( @@ -222,6 +233,13 @@ func runFlamencoManager() bool { sleepScheduler.Run(mainCtx) }() + // Run the Job Deleter. + wg.Add(1) + go func() { + defer wg.Done() + jobDeleter.Run(mainCtx) + }() + // Log the URLs last, hopefully that makes them more visible / encouraging to go to for users. go func() { time.Sleep(100 * time.Millisecond) @@ -245,29 +263,6 @@ func runFlamencoManager() bool { return doRestart } -func buildFlamencoAPI( - timeService clock.Clock, - configService *config.Service, - persist *persistence.DB, - taskStateMachine *task_state_machine.StateMachine, - shamanServer api_impl.Shaman, - logStorage *task_logs.Storage, - webUpdater *webupdates.BiDirComms, - lastRender *last_rendered.LastRenderedProcessor, - localStorage local_storage.StorageInfo, - sleepScheduler *sleep_scheduler.SleepScheduler, -) *api_impl.Flamenco { - compiler, err := job_compilers.Load(timeService) - if err != nil { - log.Fatal().Err(err).Msg("error loading job compilers") - } - flamenco := api_impl.NewFlamenco( - compiler, persist, webUpdater, logStorage, configService, - taskStateMachine, shamanServer, timeService, lastRender, - localStorage, sleepScheduler) - return flamenco -} - func buildShamanServer(configService *config.Service, isFirstRun bool) api_impl.Shaman { if isFirstRun { log.Info().Msg("Not starting Shaman storage service, as this is the first run of Flamenco. Configure the shared storage location first.") diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index af2a16e0..f95a87c9 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -27,6 +27,7 @@ type Flamenco struct { lastRender LastRendered localStorage LocalStorage sleepScheduler WorkerSleepScheduler + jobDeleter JobDeleter // The task scheduler can be locked to prevent multiple Workers from getting // the same task. It is also used for certain other queries, like @@ -53,6 +54,7 @@ func NewFlamenco( lr LastRendered, localStorage LocalStorage, wss WorkerSleepScheduler, + jd JobDeleter, ) *Flamenco { return &Flamenco{ jobCompiler: jc, @@ -66,6 +68,7 @@ func NewFlamenco( lastRender: lr, localStorage: localStorage, sleepScheduler: wss, + jobDeleter: jd, done: make(chan struct{}), } diff --git a/internal/manager/api_impl/dummy/shaman.go b/internal/manager/api_impl/dummy/shaman.go index aa1176f8..39ae88d7 100644 --- a/internal/manager/api_impl/dummy/shaman.go +++ b/internal/manager/api_impl/dummy/shaman.go @@ -33,3 +33,6 @@ func (ds *DummyShaman) FileStoreCheck(ctx context.Context, checksum string, file func (ds *DummyShaman) FileStore(ctx context.Context, file io.ReadCloser, checksum string, filesize int64, canDefer bool, originalFilename string) error { return ErrDummyShaman } +func (ds *DummyShaman) EraseCheckout(checkoutID string) error { + return ErrDummyShaman +} diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 0b9661a3..bfa778df 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -15,6 +15,7 @@ import ( "git.blender.org/flamenco/internal/manager/config" "git.blender.org/flamenco/internal/manager/job_compilers" + "git.blender.org/flamenco/internal/manager/job_deleter" "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/sleep_scheduler" @@ -25,7 +26,7 @@ import ( ) // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler +//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler,JobDeleter type PersistenceService interface { StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error @@ -197,6 +198,9 @@ type Shaman interface { // return early when another client finishes uploading the exact same file, to // prevent double uploads. FileStore(ctx context.Context, file io.ReadCloser, checksum string, filesize int64, canDefer bool, originalFilename string) error + + // EraseCheckout deletes the symlinks and the directory structure that makes up the checkout. + EraseCheckout(checkoutID string) error } var _ Shaman = (*shaman.Server)(nil) @@ -216,3 +220,9 @@ type WorkerSleepScheduler interface { } var _ WorkerSleepScheduler = (*sleep_scheduler.SleepScheduler)(nil) + +type JobDeleter interface { + QueueJobDeletion(ctx context.Context, job *persistence.Job) error +} + +var _ JobDeleter = (*job_deleter.Service)(nil) diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 13192fe9..dfc94adc 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -160,6 +160,40 @@ func (f *Flamenco) compileSubmittedJob(ctx context.Context, logger zerolog.Logge return f.jobCompiler.Compile(ctx, submittedJob) } +// DeleteJob marks the job as "deletion requested" so that the job deletion +// service can actually delete it. +func (f *Flamenco) DeleteJob(e echo.Context, jobID string) error { + logger := requestLogger(e).With(). + Str("job", jobID). + Logger() + + dbJob, err := f.fetchJob(e, logger, jobID) + if dbJob == nil { + // f.fetchJob already sent a response. + return err + } + + logger = logger.With(). + Str("currentstatus", string(dbJob.Status)). + Logger() + logger.Info().Msg("job deletion requested") + + // All the required info is known, this can keep running even when the client + // disconnects. + ctx := context.Background() + err = f.jobDeleter.QueueJobDeletion(ctx, dbJob) + switch { + case persistence.ErrIsDBBusy(err): + logger.Error().AnErr("cause", err).Msg("database too busy to queue job deletion") + return sendAPIErrorDBBusy(e, "too busy to queue job deletion, try again later") + case err != nil: + logger.Error().AnErr("cause", err).Msg("error queueing job deletion") + return sendAPIError(e, http.StatusInternalServerError, "error queueing job deletion") + default: + return e.NoContent(http.StatusNoContent) + } +} + // SetJobStatus is used by the web interface to change a job's status. func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error { logger := requestLogger(e).With(). diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 98d08588..125e985e 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -804,3 +804,31 @@ func TestFetchGlobalLastRenderedInfo(t *testing.T) { } } + +func TestDeleteJob(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + jobID := "18a9b096-d77e-438c-9be2-74397038298b" + dbJob := persistence.Job{ + Model: persistence.Model{ID: 47}, + UUID: jobID, + Name: "test job", + Status: api.JobStatusFailed, + Settings: persistence.StringInterfaceMap{}, + Metadata: persistence.StringStringMap{}, + } + + // Set up expectations. + echoCtx := mf.prepareMockedRequest(nil) + mf.persistence.EXPECT().FetchJob(moremock.ContextWithDeadline(), jobID).Return(&dbJob, nil) + mf.jobDeleter.EXPECT().QueueJobDeletion(gomock.Any(), &dbJob) + + // Do the call. + err := mf.flamenco.DeleteJob(echoCtx, jobID) + assert.NoError(t, err) + + assertResponseNoContent(t, echoCtx) +} diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index b98f9bd5..9402a979 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler) +// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler,JobDeleter) // Package mocks is a generated GoMock package. package mocks @@ -979,6 +979,20 @@ func (mr *MockShamanMockRecorder) Checkout(arg0, arg1 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Checkout", reflect.TypeOf((*MockShaman)(nil).Checkout), arg0, arg1) } +// EraseCheckout mocks base method. +func (m *MockShaman) EraseCheckout(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EraseCheckout", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// EraseCheckout indicates an expected call of EraseCheckout. +func (mr *MockShamanMockRecorder) EraseCheckout(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EraseCheckout", reflect.TypeOf((*MockShaman)(nil).EraseCheckout), arg0) +} + // FileStore mocks base method. func (m *MockShaman) FileStore(arg0 context.Context, arg1 io.ReadCloser, arg2 string, arg3 int64, arg4 bool, arg5 string) error { m.ctrl.T.Helper() @@ -1219,3 +1233,40 @@ func (mr *MockWorkerSleepSchedulerMockRecorder) WorkerStatus(arg0, arg1 interfac mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerStatus", reflect.TypeOf((*MockWorkerSleepScheduler)(nil).WorkerStatus), arg0, arg1) } + +// MockJobDeleter is a mock of JobDeleter interface. +type MockJobDeleter struct { + ctrl *gomock.Controller + recorder *MockJobDeleterMockRecorder +} + +// MockJobDeleterMockRecorder is the mock recorder for MockJobDeleter. +type MockJobDeleterMockRecorder struct { + mock *MockJobDeleter +} + +// NewMockJobDeleter creates a new mock instance. +func NewMockJobDeleter(ctrl *gomock.Controller) *MockJobDeleter { + mock := &MockJobDeleter{ctrl: ctrl} + mock.recorder = &MockJobDeleterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockJobDeleter) EXPECT() *MockJobDeleterMockRecorder { + return m.recorder +} + +// QueueJobDeletion mocks base method. +func (m *MockJobDeleter) QueueJobDeletion(arg0 context.Context, arg1 *persistence.Job) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueueJobDeletion", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// QueueJobDeletion indicates an expected call of QueueJobDeletion. +func (mr *MockJobDeleterMockRecorder) QueueJobDeletion(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueJobDeletion", reflect.TypeOf((*MockJobDeleter)(nil).QueueJobDeletion), arg0, arg1) +} diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index 97e660ee..fb96dbb0 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -36,6 +36,7 @@ type mockedFlamenco struct { lastRender *mocks.MockLastRendered localStorage *mocks.MockLocalStorage sleepScheduler *mocks.MockWorkerSleepScheduler + jobDeleter *mocks.MockJobDeleter // Place for some tests to store a temporary directory. tempdir string @@ -52,6 +53,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { lr := mocks.NewMockLastRendered(mockCtrl) localStore := mocks.NewMockLocalStorage(mockCtrl) wss := mocks.NewMockWorkerSleepScheduler(mockCtrl) + jd := mocks.NewMockJobDeleter(mockCtrl) clock := clock.NewMock() mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00") @@ -60,7 +62,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { } clock.Set(mockedNow) - f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore, wss) + f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore, wss, jd) return mockedFlamenco{ flamenco: f, @@ -75,6 +77,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { lastRender: lr, localStorage: localStore, sleepScheduler: wss, + jobDeleter: jd, } } diff --git a/internal/manager/job_deleter/interfaces.go b/internal/manager/job_deleter/interfaces.go new file mode 100644 index 00000000..bdb8cab5 --- /dev/null +++ b/internal/manager/job_deleter/interfaces.go @@ -0,0 +1,53 @@ +package job_deleter + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + + "git.blender.org/flamenco/internal/manager/local_storage" + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/webupdates" + "git.blender.org/flamenco/pkg/api" + "git.blender.org/flamenco/pkg/shaman" +) + +// Generate mock implementations of these interfaces. +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/job_deleter PersistenceService,Storage,ChangeBroadcaster,Shaman + +type PersistenceService interface { + FetchJob(ctx context.Context, jobUUID string) (*persistence.Job, error) + + RequestJobDeletion(ctx context.Context, j *persistence.Job) error + // FetchJobsDeletionRequested returns the UUIDs of to-be-deleted jobs. + FetchJobsDeletionRequested(ctx context.Context) ([]string, error) + DeleteJob(ctx context.Context, jobUUID string) error +} + +// PersistenceService should be a subset of persistence.DB +var _ PersistenceService = (*persistence.DB)(nil) + +type Storage interface { + // RemoveJobStorage removes from disk the directory for storing job-related files. + RemoveJobStorage(ctx context.Context, jobUUID string) error +} + +var _ Storage = (*local_storage.StorageInfo)(nil) + +type ChangeBroadcaster interface { + // BroadcastJobUpdate sends the job update to SocketIO clients. + BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) +} + +// ChangeBroadcaster should be a subset of webupdates.BiDirComms +var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) + +type Shaman interface { + // IsEnabled returns whether this Shaman service is enabled or not. + IsEnabled() bool + + // EraseCheckout deletes the symlinks and the directory structure that makes up the checkout. + EraseCheckout(checkoutID string) error +} + +var _ Shaman = (*shaman.Server)(nil) diff --git a/internal/manager/job_deleter/job_deleter.go b/internal/manager/job_deleter/job_deleter.go new file mode 100644 index 00000000..8953c23c --- /dev/null +++ b/internal/manager/job_deleter/job_deleter.go @@ -0,0 +1,180 @@ +// package job_deleter has functionality to delete jobs. +// +// Requesting deletion marks the job as "deletion requested" in the database. +// This is relatively fast, and persistent. After this, the job is queued for +// actual deletion by a different goroutine. +// +// At startup of the service the database is inspected and still-pending +// deletion requests are queued. +// +// SPDX-License-Identifier: GPL-3.0-or-later +package job_deleter + +import ( + "context" + "errors" + "fmt" + "time" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/shaman" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +// jobDeletionQueueSize determines how many job deletion requests can be kept in +// memory at a time. This is variable to allow unit testing with lower limits. +var jobDeletionQueueSize = defaultJobDeletionQueueSize + +const defaultJobDeletionQueueSize = 100 + +// Service can mark jobs as "deletion requested", as well as delete those jobs +// in a background goroutine. +type Service struct { + // Injected dependencies. + persist PersistenceService + storage Storage + changeBroadcaster ChangeBroadcaster + shaman Shaman + + queue chan string // Job UUIDs to process. +} + +// NewService constructs a new job deletion service. +// `shaman` can be nil if Shaman checkouts shouldn't be erased. +func NewService( + persist PersistenceService, + storage Storage, + changeBroadcaster ChangeBroadcaster, + shaman Shaman, +) *Service { + return &Service{ + persist: persist, + storage: storage, + changeBroadcaster: changeBroadcaster, + shaman: shaman, + + queue: make(chan string, jobDeletionQueueSize), + } +} + +func (s *Service) QueueJobDeletion(ctx context.Context, job *persistence.Job) error { + logger := log.With().Str("job", job.UUID).Logger() + logger.Info().Msg("job deleter: queueing job for deletion") + + err := s.persist.RequestJobDeletion(ctx, job) + if err != nil { + return fmt.Errorf("requesting job deletion: %w", err) + } + + // Let the Run() goroutine know this job is ready for deletion. + select { + case s.queue <- job.UUID: + logger.Debug().Msg("job deleter: job succesfully queued for deletion") + case <-time.After(100 * time.Millisecond): + logger.Debug().Msg("job deleter: job deletion queue is full") + } + return nil +} + +// Run processes the queue of deletion requests. It starts by building up a +// queue of still-pending job deletions. +func (s *Service) Run(ctx context.Context) { + s.queuePendingDeletions(ctx) + + log.Debug().Msg("job deleter: running") + defer log.Debug().Msg("job deleter: shutting down") + + for { + select { + case <-ctx.Done(): + return + case jobUUID := <-s.queue: + s.deleteJob(ctx, jobUUID) + case <-time.After(1 * time.Minute): + // Inspect the database to see if there was anything marked for deletion + // without getting into our queue. This can happen when lots of jobs are + // queued in quick succession, as then the queue channel gets full. + if len(s.queue) == 0 { + s.queuePendingDeletions(ctx) + } + } + } +} + +func (s *Service) queuePendingDeletions(ctx context.Context) { + log.Debug().Msg("job deleter: finding pending deletions") + + jobUUIDs, err := s.persist.FetchJobsDeletionRequested(ctx) + if err != nil { + log.Warn().AnErr("cause", err).Msg("job deleter: could not find jobs to be deleted in database") + return + } + + for _, jobUUID := range jobUUIDs { + select { + case s.queue <- jobUUID: + log.Debug().Str("job", jobUUID).Msg("job deleter: job queued for deletion") + case <-time.After(100 * time.Millisecond): + log.Info().Msg("job deleter: job deletion queue is full") + break + } + } +} + +func (s *Service) deleteJob(ctx context.Context, jobUUID string) error { + logger := log.With().Str("job", jobUUID).Logger() + + err := s.deleteShamanCheckout(ctx, logger, jobUUID) + if err != nil { + return err + } + + logger.Info().Msg("job deleter: removing logs, last-rendered images, etc.") + if err := s.storage.RemoveJobStorage(ctx, jobUUID); err != nil { + logger.Error().Err(err).Msg("job deleter: error removing job logs, job deletion aborted") + return err + } + + logger.Info().Msg("job deleter: removing job from database") + if err := s.persist.DeleteJob(ctx, jobUUID); err != nil { + logger.Error().Err(err).Msg("job deleter: unable to remove job from database") + return err + } + + // TODO: broadcast that this job was deleted. + + logger.Info().Msg("job deleter: job removal complete") + return nil +} + +func (s *Service) deleteShamanCheckout(ctx context.Context, logger zerolog.Logger, jobUUID string) error { + if !s.shaman.IsEnabled() { + logger.Debug().Msg("job deleter: Shaman not enabled, skipping job file deletion") + return nil + } + + // To erase the Shaman checkout we need more info than just its UUID. + dbJob, err := s.persist.FetchJob(ctx, jobUUID) + if err != nil { + return fmt.Errorf("unable to fetch job from database: %w", err) + } + + checkoutID := dbJob.Storage.ShamanCheckoutID + if checkoutID == "" { + logger.Info().Msg("job deleter: job was not created with Shaman (or before Flamenco v3.2), skipping job file deletion") + return nil + } + + err = s.shaman.EraseCheckout(checkoutID) + switch { + case errors.Is(err, shaman.ErrDoesNotExist): + logger.Info().Msg("job deleter: Shaman checkout directory does not exist, ignoring") + return nil + case err != nil: + logger.Info().Err(err).Msg("job deleter: Shaman checkout directory could not be erased") + return err + } + + return nil +} diff --git a/internal/manager/job_deleter/job_deleter_test.go b/internal/manager/job_deleter/job_deleter_test.go new file mode 100644 index 00000000..a321dd84 --- /dev/null +++ b/internal/manager/job_deleter/job_deleter_test.go @@ -0,0 +1,193 @@ +package job_deleter + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "errors" + "testing" + + "git.blender.org/flamenco/internal/manager/job_deleter/mocks" + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/shaman" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +type JobDeleterMocks struct { + persist *mocks.MockPersistenceService + storage *mocks.MockStorage + broadcaster *mocks.MockChangeBroadcaster + shaman *mocks.MockShaman + + ctx context.Context + cancel context.CancelFunc +} + +func TestQueueJobDeletion(t *testing.T) { + s, finish, mocks := jobDeleterTestFixtures(t) + defer finish() + + job1 := &persistence.Job{UUID: "2f7d910f-08a6-4b0f-8ecb-b3946939ed1b"} + mocks.persist.EXPECT().RequestJobDeletion(mocks.ctx, job1) + assert.NoError(t, s.QueueJobDeletion(mocks.ctx, job1)) + + // Call twice more to overflow the queue. + job2 := &persistence.Job{UUID: "e8fbe41c-ed24-46df-ba63-8d4f5524071b"} + mocks.persist.EXPECT().RequestJobDeletion(mocks.ctx, job2) + assert.NoError(t, s.QueueJobDeletion(mocks.ctx, job2)) + + job3 := &persistence.Job{UUID: "deeab6ba-02cd-42c0-b7bc-2367a2f04c7d"} + mocks.persist.EXPECT().RequestJobDeletion(mocks.ctx, job3) + assert.NoError(t, s.QueueJobDeletion(mocks.ctx, job3)) + + if assert.Len(t, s.queue, 2, "the first two job UUID should be queued") { + assert.Equal(t, job1.UUID, <-s.queue) + assert.Equal(t, job2.UUID, <-s.queue) + } +} + +func TestQueuePendingDeletions(t *testing.T) { + s, finish, mocks := jobDeleterTestFixtures(t) + defer finish() + + // Queue one more job than fits. + job1 := "aa420164-926a-45d5-ae8b-510ff3d2cd4d" + job2 := "e5feadee-999e-48c2-853d-9db94e7623b0" + job3 := "8516ac60-787c-411e-80a7-026456034da4" + + mocks.persist.EXPECT(). + FetchJobsDeletionRequested(mocks.ctx). + Return([]string{job1, job2, job3}, nil) + s.queuePendingDeletions(mocks.ctx) + if assert.Len(t, s.queue, 2, "the first two job UUIDs should be queued") { + assert.Equal(t, job1, <-s.queue) + assert.Equal(t, job2, <-s.queue) + } +} + +func TestQueuePendingDeletionsUnhappy(t *testing.T) { + s, finish, mocks := jobDeleterTestFixtures(t) + defer finish() + + // Any error fetching the deletion-requested jobs should just be logged, and + // not cause any issues. + mocks.persist.EXPECT(). + FetchJobsDeletionRequested(mocks.ctx). + Return(nil, errors.New("mocked DB failure")) + + s.queuePendingDeletions(mocks.ctx) + assert.Len(t, s.queue, 0) +} + +func TestDeleteJobWithoutShaman(t *testing.T) { + s, finish, mocks := jobDeleterTestFixtures(t) + defer finish() + + jobUUID := "2f7d910f-08a6-4b0f-8ecb-b3946939ed1b" + + mocks.shaman.EXPECT().IsEnabled().Return(false).AnyTimes() + mocks.persist.EXPECT(). + FetchJobsDeletionRequested(mocks.ctx). + Return([]string{jobUUID}, nil). + AnyTimes() + + // Mock log storage deletion failure. This should prevent the deletion from the database. + mocks.storage.EXPECT(). + RemoveJobStorage(mocks.ctx, jobUUID). + Return(errors.New("intended log file deletion failure")) + assert.Error(t, s.deleteJob(mocks.ctx, jobUUID)) + + // Mock that log storage deletion is ok, but database is not. + mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID) + mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID). + Return(errors.New("mocked DB error")) + assert.Error(t, s.deleteJob(mocks.ctx, jobUUID)) + + // Mock that everything went OK. + mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID) + mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID) + // TODO: mocks.broadcaster.EXPECT().BroadcastJobUpdate(...) + assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID)) +} + +func TestDeleteJobWithShaman(t *testing.T) { + s, finish, mocks := jobDeleterTestFixtures(t) + defer finish() + + jobUUID := "2f7d910f-08a6-4b0f-8ecb-b3946939ed1b" + + mocks.shaman.EXPECT().IsEnabled().Return(true).AnyTimes() + mocks.persist.EXPECT(). + FetchJobsDeletionRequested(mocks.ctx). + Return([]string{jobUUID}, nil). + AnyTimes() + + shamanCheckoutID := "010_0431_lighting" + dbJob := persistence.Job{ + UUID: jobUUID, + Name: "сцена/shot/010_0431_lighting", + Storage: persistence.JobStorageInfo{ + ShamanCheckoutID: shamanCheckoutID, + }, + } + mocks.persist.EXPECT().FetchJob(mocks.ctx, jobUUID).Return(&dbJob, nil).AnyTimes() + + // Mock that Shaman deletion failed. The rest of the deletion should be + // blocked by this. + mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID).Return(errors.New("mocked failure")) + assert.Error(t, s.deleteJob(mocks.ctx, jobUUID)) + + // Mock that Shaman deletion couldn't happen because the checkout dir doesn't + // exist. The rest of the deletion should continue. + mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID).Return(shaman.ErrDoesNotExist) + // Mock log storage deletion failure. This should prevent the deletion from the database. + mocks.storage.EXPECT(). + RemoveJobStorage(mocks.ctx, jobUUID). + Return(errors.New("intended log file deletion failure")) + assert.Error(t, s.deleteJob(mocks.ctx, jobUUID)) + + // Mock that log storage deletion is ok, but database is not. + mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID) + mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID) + mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID). + Return(errors.New("mocked DB error")) + assert.Error(t, s.deleteJob(mocks.ctx, jobUUID)) + + // Mock that everything went OK. + mocks.shaman.EXPECT().EraseCheckout(shamanCheckoutID) + mocks.storage.EXPECT().RemoveJobStorage(mocks.ctx, jobUUID) + mocks.persist.EXPECT().DeleteJob(mocks.ctx, jobUUID) + // TODO: mocks.broadcaster.EXPECT().BroadcastJobUpdate(...) + assert.NoError(t, s.deleteJob(mocks.ctx, jobUUID)) +} + +func jobDeleterTestFixtures(t *testing.T) (*Service, func(), *JobDeleterMocks) { + mockCtrl := gomock.NewController(t) + + mocks := &JobDeleterMocks{ + persist: mocks.NewMockPersistenceService(mockCtrl), + storage: mocks.NewMockStorage(mockCtrl), + broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl), + shaman: mocks.NewMockShaman(mockCtrl), + } + + ctx, cancel := context.WithCancel(context.Background()) + mocks.ctx = ctx + mocks.cancel = cancel + + // This should be called at the end of each unit test. + finish := func() { + mocks.cancel() + jobDeletionQueueSize = defaultJobDeletionQueueSize + } + jobDeletionQueueSize = 2 + + s := NewService( + mocks.persist, + mocks.storage, + mocks.broadcaster, + mocks.shaman, + ) + return s, finish, mocks +} diff --git a/internal/manager/job_deleter/mocks/interfaces_mock.gen.go b/internal/manager/job_deleter/mocks/interfaces_mock.gen.go new file mode 100644 index 00000000..c1fa65ea --- /dev/null +++ b/internal/manager/job_deleter/mocks/interfaces_mock.gen.go @@ -0,0 +1,218 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: git.blender.org/flamenco/internal/manager/job_deleter (interfaces: PersistenceService,Storage,ChangeBroadcaster,Shaman) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + persistence "git.blender.org/flamenco/internal/manager/persistence" + api "git.blender.org/flamenco/pkg/api" + gomock "github.com/golang/mock/gomock" +) + +// MockPersistenceService is a mock of PersistenceService interface. +type MockPersistenceService struct { + ctrl *gomock.Controller + recorder *MockPersistenceServiceMockRecorder +} + +// MockPersistenceServiceMockRecorder is the mock recorder for MockPersistenceService. +type MockPersistenceServiceMockRecorder struct { + mock *MockPersistenceService +} + +// NewMockPersistenceService creates a new mock instance. +func NewMockPersistenceService(ctrl *gomock.Controller) *MockPersistenceService { + mock := &MockPersistenceService{ctrl: ctrl} + mock.recorder = &MockPersistenceServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder { + return m.recorder +} + +// DeleteJob mocks base method. +func (m *MockPersistenceService) DeleteJob(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteJob", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteJob indicates an expected call of DeleteJob. +func (mr *MockPersistenceServiceMockRecorder) DeleteJob(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteJob", reflect.TypeOf((*MockPersistenceService)(nil).DeleteJob), arg0, arg1) +} + +// FetchJob mocks base method. +func (m *MockPersistenceService) FetchJob(arg0 context.Context, arg1 string) (*persistence.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchJob", arg0, arg1) + ret0, _ := ret[0].(*persistence.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchJob indicates an expected call of FetchJob. +func (mr *MockPersistenceServiceMockRecorder) FetchJob(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJob", reflect.TypeOf((*MockPersistenceService)(nil).FetchJob), arg0, arg1) +} + +// FetchJobsDeletionRequested mocks base method. +func (m *MockPersistenceService) FetchJobsDeletionRequested(arg0 context.Context) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchJobsDeletionRequested", arg0) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchJobsDeletionRequested indicates an expected call of FetchJobsDeletionRequested. +func (mr *MockPersistenceServiceMockRecorder) FetchJobsDeletionRequested(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobsDeletionRequested", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobsDeletionRequested), arg0) +} + +// RequestJobDeletion mocks base method. +func (m *MockPersistenceService) RequestJobDeletion(arg0 context.Context, arg1 *persistence.Job) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequestJobDeletion", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RequestJobDeletion indicates an expected call of RequestJobDeletion. +func (mr *MockPersistenceServiceMockRecorder) RequestJobDeletion(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestJobDeletion", reflect.TypeOf((*MockPersistenceService)(nil).RequestJobDeletion), arg0, arg1) +} + +// MockStorage is a mock of Storage interface. +type MockStorage struct { + ctrl *gomock.Controller + recorder *MockStorageMockRecorder +} + +// MockStorageMockRecorder is the mock recorder for MockStorage. +type MockStorageMockRecorder struct { + mock *MockStorage +} + +// NewMockStorage creates a new mock instance. +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ctrl: ctrl} + mock.recorder = &MockStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorage) EXPECT() *MockStorageMockRecorder { + return m.recorder +} + +// RemoveJobStorage mocks base method. +func (m *MockStorage) RemoveJobStorage(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveJobStorage", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveJobStorage indicates an expected call of RemoveJobStorage. +func (mr *MockStorageMockRecorder) RemoveJobStorage(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveJobStorage", reflect.TypeOf((*MockStorage)(nil).RemoveJobStorage), arg0, arg1) +} + +// MockChangeBroadcaster is a mock of ChangeBroadcaster interface. +type MockChangeBroadcaster struct { + ctrl *gomock.Controller + recorder *MockChangeBroadcasterMockRecorder +} + +// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster. +type MockChangeBroadcasterMockRecorder struct { + mock *MockChangeBroadcaster +} + +// NewMockChangeBroadcaster creates a new mock instance. +func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster { + mock := &MockChangeBroadcaster{ctrl: ctrl} + mock.recorder = &MockChangeBroadcasterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder { + return m.recorder +} + +// BroadcastJobUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastJobUpdate(arg0 api.SocketIOJobUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastJobUpdate", arg0) +} + +// BroadcastJobUpdate indicates an expected call of BroadcastJobUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastJobUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastJobUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastJobUpdate), arg0) +} + +// MockShaman is a mock of Shaman interface. +type MockShaman struct { + ctrl *gomock.Controller + recorder *MockShamanMockRecorder +} + +// MockShamanMockRecorder is the mock recorder for MockShaman. +type MockShamanMockRecorder struct { + mock *MockShaman +} + +// NewMockShaman creates a new mock instance. +func NewMockShaman(ctrl *gomock.Controller) *MockShaman { + mock := &MockShaman{ctrl: ctrl} + mock.recorder = &MockShamanMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockShaman) EXPECT() *MockShamanMockRecorder { + return m.recorder +} + +// EraseCheckout mocks base method. +func (m *MockShaman) EraseCheckout(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EraseCheckout", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// EraseCheckout indicates an expected call of EraseCheckout. +func (mr *MockShamanMockRecorder) EraseCheckout(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EraseCheckout", reflect.TypeOf((*MockShaman)(nil).EraseCheckout), arg0) +} + +// IsEnabled mocks base method. +func (m *MockShaman) IsEnabled() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsEnabled") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsEnabled indicates an expected call of IsEnabled. +func (mr *MockShamanMockRecorder) IsEnabled() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsEnabled", reflect.TypeOf((*MockShaman)(nil).IsEnabled)) +} diff --git a/internal/manager/local_storage/local_storage.go b/internal/manager/local_storage/local_storage.go index 45c03a71..18b2d96d 100644 --- a/internal/manager/local_storage/local_storage.go +++ b/internal/manager/local_storage/local_storage.go @@ -3,6 +3,8 @@ package local_storage // SPDX-License-Identifier: GPL-3.0-or-later import ( + "context" + "errors" "fmt" "os" "path/filepath" @@ -37,21 +39,44 @@ func (si StorageInfo) ForJob(jobUUID string) string { return filepath.Join(si.rootPath, relPathForJob(jobUUID)) } -// Erase removes the entire storage directory from disk. -func (si StorageInfo) Erase() error { - // A few safety measures before erasing the planet. - if si.rootPath == "" { - return fmt.Errorf("%+v.Erase(): refusing to erase empty directory", si) - } - if crosspath.IsRoot(si.rootPath) { - return fmt.Errorf("%+v.Erase(): refusing to erase root directory", si) - } - if home, found := os.LookupEnv("HOME"); found && home == si.rootPath { - return fmt.Errorf("%+v.Erase(): refusing to erase home directory %s", si, home) +func (si StorageInfo) RemoveJobStorage(ctx context.Context, jobUUID string) error { + path := si.ForJob(jobUUID) + log.Info().Str("path", path).Msg("erasing manager-local job storage directory") + + if err := removeDirectory(path); err != nil { + return fmt.Errorf("unable to erase %q: %w", path, err) } - log.Debug().Str("path", si.rootPath).Msg("erasing storage directory") - return os.RemoveAll(si.rootPath) + // The path should be in some intermediate path + // (`root/intermediate/job-uuid`), which might need removing if it's empty. + intermediate := filepath.Dir(path) + if intermediate == si.rootPath { + // There is no intermediate dir for jobless situations. Or maybe the rest of + // the code changed since this function was written. Regardless of the + // reason, this function shouldn't remove the local storage root. + return nil + } + + if err := os.Remove(intermediate); err != nil { + // This is absolutely fine, as it'll happen when the directory is not empty + // and thus shouldn't be removed anyway. + log.Trace(). + Str("job", jobUUID). + Str("path", intermediate). + AnErr("cause", err). + Msg("RemoveJobStorage() could not remove intermediate directory, this is fine") + } + return nil +} + +// Erase removes the entire storage directory from disk. +func (si StorageInfo) Erase() error { + log.Info().Str("path", si.rootPath).Msg("erasing storage directory") + + if err := removeDirectory(si.rootPath); err != nil { + return fmt.Errorf("unable to erase %q: %w", si.rootPath, err) + } + return nil } // MustErase removes the entire storage directory from disk, and panics if it @@ -97,3 +122,20 @@ func getSuitableStorageRoot() string { // Fall back to "." if all else fails. return "." } + +// removeDirectory removes the given path, but only if it is not a root path and +// not the user's home directory. +func removeDirectory(path string) error { + if path == "" { + return fmt.Errorf("refusing to erase empty directory path (%q)", path) + } + if crosspath.IsRoot(path) { + return errors.New("refusing to erase root directory") + } + if home, found := os.LookupEnv("HOME"); found && home == path { + return errors.New("refusing to erase home directory") + } + + log.Debug().Str("path", path).Msg("erasing directory") + return os.RemoveAll(path) +} diff --git a/internal/manager/local_storage/local_storage_test.go b/internal/manager/local_storage/local_storage_test.go index 3149fb98..2d4dd36b 100644 --- a/internal/manager/local_storage/local_storage_test.go +++ b/internal/manager/local_storage/local_storage_test.go @@ -3,12 +3,14 @@ package local_storage // SPDX-License-Identifier: GPL-3.0-or-later import ( + "context" "os" "path/filepath" "strings" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestNewNextToExe(t *testing.T) { @@ -56,3 +58,51 @@ func TestErase(t *testing.T) { assert.NoError(t, si.Erase()) assert.NoDirExists(t, si.rootPath, "Erase() should erase the root path, and everything in it") } + +func TestRemoveJobStorage(t *testing.T) { + si := NewNextToExe("task-logs") + + jobUUID := "08e126ef-d773-468b-8bab-19a8213cf2ff" + jobPath := si.ForJob(jobUUID) + assert.NoDirExists(t, jobPath, "getting a path should not create it") + + assert.NoError(t, os.MkdirAll(jobPath, os.ModePerm)) + assert.DirExists(t, jobPath, "os.MkdirAll is borked") + + taskFile := filepath.Join(jobPath, "task-07c33f32-b345-4da9-8834-9c91532cd97e.txt") + assert.NoError(t, os.WriteFile(taskFile, []byte("dummy task log"), 0o777)) + + assert.NoError(t, si.RemoveJobStorage(context.Background(), jobUUID)) + assert.NoDirExists(t, jobPath, "RemoveJobStorage() should erase the entire job-specific storage dir, and everything in it") + + // See if the test assumption (that job dir is in another sub-dir of the root, + // `root/job-xxyy/xxyyzzblablabla`) still holds. + intermediate := filepath.Dir(jobPath) + require.NotEqual(t, si.rootPath, intermediate, + "Expected job path %s to be in child directory of root %s", jobPath, si.rootPath) + + assert.NoDirExists(t, intermediate, "RemoveJobStorage() should remove empty intermediate paths") + assert.DirExists(t, si.rootPath, "RemoveJobStorage() should keep the root path") +} + +func TestRemoveJobStorageWithoutJobUUID(t *testing.T) { + si := NewNextToExe("task-logs") + + jobPath := si.ForJob("") + assert.NoDirExists(t, jobPath, "getting a path should not create it") + + assert.NoError(t, os.MkdirAll(jobPath, os.ModePerm)) + assert.DirExists(t, jobPath, "os.MkdirAll is borked") + + taskFile := filepath.Join(jobPath, "task-07c33f32-b345-4da9-8834-9c91532cd97e.txt") + assert.NoError(t, os.WriteFile(taskFile, []byte("dummy task log"), 0o777)) + + assert.NoError(t, si.RemoveJobStorage(context.Background(), "")) + assert.NoDirExists(t, jobPath, "RemoveJobStorage() should erase the entire job-specific storage dir, and everything in it") + + // See if the test assumption (that a jobless dir is directly inside the root) still holds. + intermediate := filepath.Dir(jobPath) + require.Equal(t, si.rootPath, intermediate, + "Expected job path %s to be a direct child of root %s", jobPath, si.rootPath) + assert.DirExists(t, si.rootPath, "RemoveJobStorage() should keep the root path") +} diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index ab28335e..61695041 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -4,6 +4,7 @@ package persistence import ( "context" + "database/sql" "database/sql/driver" "encoding/json" "errors" @@ -31,12 +32,19 @@ type Job struct { Settings StringInterfaceMap `gorm:"type:jsonb"` Metadata StringStringMap `gorm:"type:jsonb"` + DeleteRequestedAt sql.NullTime + Storage JobStorageInfo `gorm:"embedded;embeddedPrefix:storage_"` } type StringInterfaceMap map[string]interface{} type StringStringMap map[string]string +// DeleteRequested returns whether deletion of this job was requested. +func (j *Job) DeleteRequested() bool { + return j.DeleteRequestedAt.Valid +} + // JobStorageInfo contains info about where the job files are stored. It is // intended to be used when removing a job, which may include the removal of its // files. @@ -202,7 +210,8 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au // FetchJob fetches a single job, without fetching its tasks. func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { dbJob := Job{} - findResult := db.gormDB.WithContext(ctx).First(&dbJob, "uuid = ?", jobUUID) + findResult := db.gormDB.WithContext(ctx). + First(&dbJob, "uuid = ?", jobUUID) if findResult.Error != nil { return nil, jobError(findResult.Error, "fetching job") } @@ -222,6 +231,41 @@ func (db *DB) DeleteJob(ctx context.Context, jobUUID string) error { return nil } +// RequestJobDeletion sets the job's "DeletionRequestedAt" field to "now". +func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error { + j.DeleteRequestedAt.Time = db.gormDB.NowFunc() + j.DeleteRequestedAt.Valid = true + tx := db.gormDB.WithContext(ctx). + Model(j). + Updates(Job{DeleteRequestedAt: j.DeleteRequestedAt}) + if tx.Error != nil { + return jobError(tx.Error, "deleting job") + } + return nil +} + +func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) { + var jobs []*Job + + tx := db.gormDB.WithContext(ctx). + Model(&Job{}). + Select("UUID"). + Where("delete_requested_at is not NULL"). + Order("delete_requested_at"). + Scan(&jobs) + + if tx.Error != nil { + return nil, jobError(tx.Error, "fetching jobs marked for deletion") + } + + uuids := make([]string, len(jobs)) + for i := range jobs { + uuids[i] = jobs[i].UUID + } + + return uuids, nil +} + func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) { var jobs []*Job diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 6d78678d..99e8967f 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -101,6 +101,84 @@ func TestDeleteJob(t *testing.T) { tx = db.gormDB.Model(&Task{}).Count(&numTasks) assert.NoError(t, tx.Error) assert.Equal(t, int64(0), numTasks, "tasks should have been deleted along with their job") + + // TODO: test that blocklist entries and task dependencies are gone too. +} + +func TestRequestJobDeletion(t *testing.T) { + ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t) + defer close() + + // Create another job, to see it's not touched by deleting the first one. + authoredJob2 := duplicateJobAndTasks(authoredJob1) + persistAuthoredJob(t, ctx, db, authoredJob2) + + mockNow := time.Now() + db.gormDB.NowFunc = func() time.Time { return mockNow } + + err := db.RequestJobDeletion(ctx, job1) + assert.NoError(t, err) + assert.True(t, job1.DeleteRequested()) + assert.True(t, job1.DeleteRequestedAt.Valid) + assert.Equal(t, job1.DeleteRequestedAt.Time, mockNow) + + dbJob1, err := db.FetchJob(ctx, job1.UUID) + assert.NoError(t, err) + assert.True(t, job1.DeleteRequested()) + assert.True(t, dbJob1.DeleteRequestedAt.Valid) + assert.WithinDuration(t, mockNow, dbJob1.DeleteRequestedAt.Time, time.Second) + + // Other jobs shouldn't be touched. + dbJob2, err := db.FetchJob(ctx, authoredJob2.JobID) + assert.NoError(t, err) + assert.False(t, dbJob2.DeleteRequested()) + assert.False(t, dbJob2.DeleteRequestedAt.Valid) +} + +func TestFetchJobsDeletionRequested(t *testing.T) { + ctx, close, db, job1, authoredJob1 := jobTasksTestFixtures(t) + defer close() + + now := time.Now() + db.gormDB.NowFunc = func() time.Time { return now } + + authoredJob2 := duplicateJobAndTasks(authoredJob1) + job2 := persistAuthoredJob(t, ctx, db, authoredJob2) + authoredJob3 := duplicateJobAndTasks(authoredJob1) + job3 := persistAuthoredJob(t, ctx, db, authoredJob3) + authoredJob4 := duplicateJobAndTasks(authoredJob1) + persistAuthoredJob(t, ctx, db, authoredJob4) + + // Ensure different requests get different timestamps, + // out of chronological order. + timestamps := []time.Time{ + // timestamps for 'delete requested at' and 'updated at' + now.Add(-3 * time.Second), now.Add(-3 * time.Second), + now.Add(-1 * time.Second), now.Add(-1 * time.Second), + now.Add(-5 * time.Second), now.Add(-5 * time.Second), + } + currentTimestampIndex := 0 + db.gormDB.NowFunc = func() time.Time { + now := timestamps[currentTimestampIndex] + currentTimestampIndex++ + return now + } + + err := db.RequestJobDeletion(ctx, job1) + assert.NoError(t, err) + err = db.RequestJobDeletion(ctx, job2) + assert.NoError(t, err) + err = db.RequestJobDeletion(ctx, job3) + assert.NoError(t, err) + + actualUUIDs, err := db.FetchJobsDeletionRequested(ctx) + assert.NoError(t, err) + assert.Len(t, actualUUIDs, 3, "3 out of 4 jobs were marked for deletion") + + // Expect UUIDs in chronological order of deletion requests, so that the + // oldest request is handled first. + expectUUIDs := []string{job3.UUID, job1.UUID, job2.UUID} + assert.Equal(t, expectUUIDs, actualUUIDs) } func TestJobHasTasksInStatus(t *testing.T) { diff --git a/pkg/shaman/checkout/manager.go b/pkg/shaman/checkout/manager.go index d562b68d..3490dda6 100644 --- a/pkg/shaman/checkout/manager.go +++ b/pkg/shaman/checkout/manager.go @@ -39,6 +39,11 @@ import ( "git.blender.org/flamenco/pkg/shaman/touch" ) +var ( + // ErrDoesNotExist is returned by EraseCheckout(). + ErrDoesNotExist = errors.New("checkout does not exist") +) + // Manager creates checkouts and provides info about missing files. type Manager struct { checkoutBasePath string @@ -161,11 +166,21 @@ func (m *Manager) PrepareCheckout(requestedCheckoutPath string) (ResolvedCheckou } // EraseCheckout removes the checkout directory structure identified by the ID. +// Returns ErrDoesNotExist if the checkout with this ID does not exist. func (m *Manager) EraseCheckout(checkoutID string) error { checkoutPaths, err := m.pathForCheckout(checkoutID) if err != nil { return err } + _, err = os.Stat(checkoutPaths.absolutePath) + switch { + case err == nil: + break + case errors.Is(err, os.ErrNotExist): + return ErrDoesNotExist + default: + return err + } logger := log.With(). Str("checkoutPath", checkoutPaths.absolutePath). diff --git a/pkg/shaman/checkout/manager_test.go b/pkg/shaman/checkout/manager_test.go index 7c0d71b9..aed3e30a 100644 --- a/pkg/shaman/checkout/manager_test.go +++ b/pkg/shaman/checkout/manager_test.go @@ -23,6 +23,7 @@ package checkout import ( + "context" "io/ioutil" "os" "path/filepath" @@ -30,6 +31,7 @@ import ( "testing" "time" + "git.blender.org/flamenco/pkg/api" "git.blender.org/flamenco/pkg/shaman/config" "git.blender.org/flamenco/pkg/shaman/filestore" "github.com/stretchr/testify/assert" @@ -97,3 +99,54 @@ func TestPrepareCheckout(t *testing.T) { assert.NotEqual(t, requestedCheckoutPath, resolved.RelativePath) assert.True(t, strings.HasPrefix(resolved.RelativePath, requestedCheckoutPath+"-")) } + +func TestEraseCheckout(t *testing.T) { + manager, cleanup := createTestManager() + defer cleanup() + ctx := context.Background() + + filestore.LinkTestFileStore(manager.fileStore.BasePath()) + + // Create a few checkouts to test with. + checkout1 := api.ShamanCheckout{ + CheckoutPath: "á hausinn á þér", + Files: []api.ShamanFileSpec{ + {Sha: "590c148428d5c35fab3ebad2f3365bb469ab9c531b60831f3e826c472027a0b9", Size: 3367, Path: "subdir/replacer.py"}, + {Sha: "80b749c27b2fef7255e7e7b3c2029b03b31299c75ff1f1c72732081c70a713a3", Size: 7488, Path: "feed.py"}, + {Sha: "914853599dd2c351ab7b82b219aae6e527e51518a667f0ff32244b0c94c75688", Size: 486, Path: "httpstuff.py"}, + {Sha: "d6fc7289b5196cc96748ea72f882a22c39b8833b457fe854ef4c03a01f5db0d3", Size: 7217, Path: "много ликова.py"}, + }, + } + checkoutID1, err := manager.Checkout(ctx, checkout1) + require.NoError(t, err) + + checkout2 := checkout1 + checkout2.CheckoutPath = "één ander pad" + checkoutID2, err := manager.Checkout(ctx, checkout2) + require.NoError(t, err) + + // Check that removing one works, without deleting the other. + require.NoError(t, manager.EraseCheckout(checkoutID1)) + + checkoutPath1, err := manager.pathForCheckout(checkoutID1) + require.NoError(t, err) + checkoutPath2, err := manager.pathForCheckout(checkoutID2) + require.NoError(t, err) + + assert.NoDirExists(t, checkoutPath1.absolutePath, "actual checkout path should have been erased") + assert.DirExists(t, checkoutPath2.absolutePath, "the other checkout path should have been kept") + assert.DirExists(t, manager.fileStore.StoragePath(), "Shaman storage path should be kept") + + // Check that non-checkout paths should be refused. + require.Error(t, manager.EraseCheckout(manager.fileStore.BasePath())) +} + +func TestEraseCheckoutNonExisting(t *testing.T) { + manager, cleanup := createTestManager() + defer cleanup() + + filestore.LinkTestFileStore(manager.fileStore.BasePath()) + + // Erasing a non-existing checkout should return a specific error. + require.Error(t, manager.EraseCheckout("não existe")) +} diff --git a/pkg/shaman/server.go b/pkg/shaman/server.go index a6122f04..49d6b10e 100644 --- a/pkg/shaman/server.go +++ b/pkg/shaman/server.go @@ -36,6 +36,8 @@ import ( "github.com/rs/zerolog/log" ) +var ErrDoesNotExist = checkout.ErrDoesNotExist + // Server represents a Shaman Server. type Server struct { config config.Config @@ -153,3 +155,8 @@ func (s *Server) FileStore(ctx context.Context, file io.ReadCloser, checksum str // the caller without relying on types declared in the `fileserver` package? return err } + +// EraseCheckout deletes the symlinks and the directory structure that makes up the checkout. +func (s *Server) EraseCheckout(checkoutID string) error { + return s.checkoutMan.EraseCheckout(checkoutID) +}