From 2a345a3d2c40c163127d40dbb64bcbae220a03ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 11 Aug 2022 16:59:53 -0700 Subject: [PATCH] API for deleting workers Workers can now be soft-deleted. Tasks assigned to the worker will remain associated with that Worker. Active tasks will be re-queued so other workers can pick them up. --- internal/manager/api_impl/interfaces.go | 1 + .../api_impl/mocks/api_impl_mock.gen.go | 14 ++++ internal/manager/api_impl/worker_mgt.go | 57 ++++++++++++++++ internal/manager/api_impl/worker_mgt_test.go | 38 +++++++++++ internal/manager/persistence/jobs.go | 3 + internal/manager/persistence/workers.go | 16 +++++ internal/manager/persistence/workers_test.go | 65 +++++++++++++++++++ 7 files changed, 194 insertions(+) diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 0d7103db..103b7c93 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -46,6 +46,7 @@ type PersistenceService interface { SaveWorker(ctx context.Context, w *persistence.Worker) error SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error WorkerSeen(ctx context.Context, w *persistence.Worker) error + DeleteWorker(ctx context.Context, uuid string) error // ScheduleTask finds a task to execute by the given worker, and assigns it to that worker. // If no task is available, (nil, nil) is returned, as this is not an error situation. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 8d7ef3f5..3a5b5858 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -141,6 +141,20 @@ func (mr *MockPersistenceServiceMockRecorder) CreateWorker(arg0, arg1 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorker", reflect.TypeOf((*MockPersistenceService)(nil).CreateWorker), arg0, arg1) } +// DeleteWorker mocks base method. +func (m *MockPersistenceService) DeleteWorker(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteWorker", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteWorker indicates an expected call of DeleteWorker. +func (mr *MockPersistenceServiceMockRecorder) DeleteWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorker", reflect.TypeOf((*MockPersistenceService)(nil).DeleteWorker), arg0, arg1) +} + // FetchJob mocks base method. func (m *MockPersistenceService) FetchJob(arg0 context.Context, arg1 string) (*persistence.Job, error) { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index 933c2437..cbbbce51 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -71,6 +71,63 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { return e.JSON(http.StatusOK, apiWorker) } +func (f *Flamenco) DeleteWorker(e echo.Context, workerUUID string) error { + logger := requestLogger(e) + logger = logger.With().Str("worker", workerUUID).Logger() + + if !uuid.IsValid(workerUUID) { + return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") + } + + // All information to do the deletion is known, so even when the client + // disconnects, the deletion should be completed. + ctx, ctxCancel := bgContext() + defer ctxCancel() + + // Fetch the worker in order to re-queue its tasks. + worker, err := f.persist.FetchWorker(ctx, workerUUID) + if errors.Is(err, persistence.ErrWorkerNotFound) { + logger.Debug().Msg("deletion of non-existent worker requested") + return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID) + } + if err != nil { + logger.Error().Err(err).Msg("error fetching worker for deletion") + return sendAPIError(e, http.StatusInternalServerError, + "error fetching worker for deletion: %v", err) + } + + err = f.stateMachine.RequeueActiveTasksOfWorker(ctx, worker, "worker is being deleted") + if err != nil { + logger.Error().Err(err).Msg("error requeueing tasks before deleting worker") + return sendAPIError(e, http.StatusInternalServerError, + "error requeueing tasks before deleting worker: %v", err) + } + + // Actually delete the worker. + err = f.persist.DeleteWorker(ctx, workerUUID) + if errors.Is(err, persistence.ErrWorkerNotFound) { + logger.Debug().Msg("deletion of non-existent worker requested") + return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID) + } + if err != nil { + logger.Error().Err(err).Msg("error deleting worker") + return sendAPIError(e, http.StatusInternalServerError, "error deleting worker: %v", err) + } + logger.Info().Msg("deleted worker") + + // It would be cleaner to re-fetch the Worker from the database and get the + // exact 'deleted at' timestamp from there, but that would require more DB + // operations, and this is accurate enough for a quick broadcast via SocketIO. + now := f.clock.Now() + + // Broadcast the fact that this worker was just deleted. + update := webupdates.NewWorkerUpdate(worker) + update.DeletedAt = &now + f.broadcaster.BroadcastWorkerUpdate(update) + + return e.NoContent(http.StatusNoContent) +} + func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string) error { logger := requestLogger(e) logger = logger.With().Str("worker", workerUUID).Logger() diff --git a/internal/manager/api_impl/worker_mgt_test.go b/internal/manager/api_impl/worker_mgt_test.go index 546858c7..23586b9b 100644 --- a/internal/manager/api_impl/worker_mgt_test.go +++ b/internal/manager/api_impl/worker_mgt_test.go @@ -141,6 +141,44 @@ func TestFetchWorker(t *testing.T) { }) } +func TestDeleteWorker(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + workerUUID := worker.UUID + + // Test on non-existent worker. + mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID). + Return(nil, fmt.Errorf("wrapped: %w", persistence.ErrWorkerNotFound)) + echo := mf.prepareMockedRequest(nil) + err := mf.flamenco.DeleteWorker(echo, workerUUID) + assert.NoError(t, err) + assertResponseAPIError(t, echo, http.StatusNotFound, fmt.Sprintf("worker %q not found", workerUUID)) + + // Test with existing worker. + mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil) + mf.stateMachine.EXPECT().RequeueActiveTasksOfWorker( + gomock.Any(), &worker, "worker is being deleted") + mf.persistence.EXPECT().DeleteWorker(gomock.Any(), workerUUID).Return(nil) + + mockedNow := mf.clock.Now() + mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{ + DeletedAt: &mockedNow, + Id: worker.UUID, + Name: worker.Name, + Status: worker.Status, + Updated: worker.UpdatedAt, + Version: worker.Software, + }) + + echo = mf.prepareMockedRequest(nil) + err = mf.flamenco.DeleteWorker(echo, workerUUID) + assert.NoError(t, err) + assertResponseNoContent(t, echo) +} + func TestRequestWorkerStatusChange(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index bfb67cbb..43a0c640 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -237,6 +237,9 @@ func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) { dbTask := Task{} tx := db.gormDB.WithContext(ctx). + // Allow finding the Worker, even after it was deleted. Jobs and Tasks + // don't have soft-deletion. + Unscoped(). Joins("Job"). Joins("Worker"). First(&dbTask, "tasks.uuid = ?", taskUUID) diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index ccfad354..eb20c243 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -9,10 +9,13 @@ import ( "time" "git.blender.org/flamenco/pkg/api" + "gorm.io/gorm" ) type Worker struct { Model + DeletedAt gorm.DeletedAt `gorm:"index"` + UUID string `gorm:"type:char(36);default:'';unique;index;default:''"` Secret string `gorm:"type:varchar(255);default:''"` Name string `gorm:"type:varchar(64);default:''"` @@ -71,6 +74,19 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { return &w, nil } +func (db *DB) DeleteWorker(ctx context.Context, uuid string) error { + tx := db.gormDB.WithContext(ctx). + Where("uuid = ?", uuid). + Delete(&Worker{}) + if tx.Error != nil { + return workerError(tx.Error, "deleting worker") + } + if tx.RowsAffected == 0 { + return ErrWorkerNotFound + } + return nil +} + func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { workers := make([]*Worker, 0) tx := db.gormDB.WithContext(ctx).Model(&Worker{}).Scan(&workers) diff --git a/internal/manager/persistence/workers_test.go b/internal/manager/persistence/workers_test.go index d31a840c..aae65b71 100644 --- a/internal/manager/persistence/workers_test.go +++ b/internal/manager/persistence/workers_test.go @@ -252,3 +252,68 @@ func TestFetchWorkers(t *testing.T) { assert.Equal(t, windowsWorker.UUID, workers[1].UUID) } } + +func TestDeleteWorker(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) + defer cancel() + + // Test deleting non-existent worker + err := db.DeleteWorker(ctx, "dabf67a1-b591-4232-bf73-0b8de2a9488e") + assert.ErrorIs(t, err, ErrWorkerNotFound) + + // Test deleting existing worker + w1 := Worker{ + UUID: "fd97a35b-a5bd-44b4-ac2b-64c193ca877d", + Name: "Worker 1", + Status: api.WorkerStatusAwake, + } + w2 := Worker{ + UUID: "82b2d176-cb8c-4bfa-8300-41c216d766df", + Name: "Worker 2", + Status: api.WorkerStatusOffline, + } + + assert.NoError(t, db.CreateWorker(ctx, &w1)) + assert.NoError(t, db.CreateWorker(ctx, &w2)) + + // Delete the 2nd worker, just to have a test with ID != 1. + assert.NoError(t, db.DeleteWorker(ctx, w2.UUID)) + + // The deleted worker should now no longer be found. + { + fetchedWorker, err := db.FetchWorker(ctx, w2.UUID) + assert.ErrorIs(t, err, ErrWorkerNotFound) + assert.Nil(t, fetchedWorker) + } + + // The other worker should still exist. + { + fetchedWorker, err := db.FetchWorker(ctx, w1.UUID) + assert.NoError(t, err) + assert.Equal(t, w1.UUID, fetchedWorker.UUID) + } + + // Assign a task to the other worker, and then delete that worker. + authJob := createTestAuthoredJobWithTasks() + persistAuthoredJob(t, ctx, db, authJob) + taskUUID := authJob.Tasks[0].UUID + { + task, err := db.FetchTask(ctx, taskUUID) + assert.NoError(t, err) + task.Worker = &w1 + assert.NoError(t, db.SaveTask(ctx, task)) + } + + // Delete the worker. + assert.NoError(t, db.DeleteWorker(ctx, w1.UUID)) + + // Check the task after deletion of the Worker. + { + fetchedTask, err := db.FetchTask(ctx, taskUUID) + assert.NoError(t, err) + assert.Equal(t, taskUUID, fetchedTask.UUID) + assert.Equal(t, w1.UUID, fetchedTask.Worker.UUID) + assert.NotZero(t, fetchedTask.Worker.DeletedAt.Time) + assert.True(t, fetchedTask.Worker.DeletedAt.Valid) + } +}