From b4d2fc4231929e4b9c88ba93b1b506fe1153ac2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 3 Jun 2022 16:33:50 +0200 Subject: [PATCH] Manager: keep track of when a Worker last worked on a task This will be used for keeping track of stuck tasks. --- internal/manager/api_impl/interfaces.go | 2 ++ .../api_impl/mocks/api_impl_mock.gen.go | 14 ++++++++++ internal/manager/api_impl/workers.go | 27 ++++++++++++++++--- internal/manager/api_impl/workers_test.go | 12 +++++++++ internal/manager/persistence/jobs.go | 17 ++++++++++-- internal/manager/persistence/jobs_test.go | 19 +++++++++++++ .../manager/persistence/task_scheduler.go | 5 ++-- .../persistence/task_scheduler_test.go | 2 ++ 8 files changed, 90 insertions(+), 8 deletions(-) diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index b8ac0978..8b2f9721 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -31,6 +31,8 @@ type PersistenceService interface { SaveTask(ctx context.Context, task *persistence.Task) error SaveTaskActivity(ctx context.Context, t *persistence.Task) error FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error) + // TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection. + TaskTouchedByWorker(context.Context, *persistence.Task) error CreateWorker(ctx context.Context, w *persistence.Worker) error FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index a5043bd2..4358ca2c 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -244,6 +244,20 @@ func (mr *MockPersistenceServiceMockRecorder) StoreAuthoredJob(arg0, arg1 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreAuthoredJob", reflect.TypeOf((*MockPersistenceService)(nil).StoreAuthoredJob), arg0, arg1) } +// TaskTouchedByWorker mocks base method. +func (m *MockPersistenceService) TaskTouchedByWorker(arg0 context.Context, arg1 *persistence.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TaskTouchedByWorker", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// TaskTouchedByWorker indicates an expected call of TaskTouchedByWorker. +func (mr *MockPersistenceServiceMockRecorder) TaskTouchedByWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskTouchedByWorker", reflect.TypeOf((*MockPersistenceService)(nil).TaskTouchedByWorker), arg0, arg1) +} + // MockChangeBroadcaster is a mock of ChangeBroadcaster interface. type MockChangeBroadcaster struct { ctrl *gomock.Controller diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 539985ec..5870f0b2 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -385,8 +385,14 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { // TODO: check whether this task may undergo the requested status change. - if err := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate); err != nil { - return sendAPIError(e, http.StatusInternalServerError, "unable to handle status update: %v", err) + taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate) + workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask) + + if taskUpdateErr != nil { + return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr) + } + if workerUpdateErr != nil { + return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr) } return e.NoContent(http.StatusNoContent) @@ -437,6 +443,19 @@ func (f *Flamenco) doTaskUpdate( return dbErrActivity } +func (f *Flamenco) workerPingedTask( + ctx context.Context, + logger zerolog.Logger, + task *persistence.Task, +) error { + err := f.persist.TaskTouchedByWorker(ctx, task) + if err != nil { + logger.Error().Err(err).Msg("error marking task as 'touched' by worker") + return err + } + return nil +} + // taskLogAppend appends a chunk of log lines to the task's log, and broadcasts it over SocketIO. func (f *Flamenco) taskLogAppend(logger zerolog.Logger, dbTask *persistence.Task, logChunk string) { // Errors writing the log to file should be logged in our own logging @@ -484,7 +503,9 @@ func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { mkr := mayWorkerRun(worker, dbTask) if mkr.MayKeepRunning { - // TODO: record that this worker "touched" this task, for timeout calculations. + // Errors saving the "worker pinged task" field in the database are just + // logged. It's not something to bother the worker with. + _ = f.workerPingedTask(ctx, logger, dbTask) } return e.JSON(http.StatusOK, mkr) diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index cf17940a..81c7b196 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -389,6 +389,14 @@ func TestTaskUpdate(t *testing.T) { Log: "line1\nline2\n", }) + // Expect a 'touch' of the task. + var touchedTask persistence.Task + mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, task *persistence.Task) error { + touchedTask = *task + return nil + }) + // Do the call. echoCtx := mf.prepareMockedJSONRequest(taskUpdate) requestWorkerStore(echoCtx, &worker) @@ -398,6 +406,7 @@ func TestTaskUpdate(t *testing.T) { assert.NoError(t, err) assert.Equal(t, mockTask.UUID, statusChangedtask.UUID) assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID) + assert.Equal(t, mockTask.UUID, touchedTask.UUID) assert.Equal(t, "testing", statusChangedtask.Activity) assert.Equal(t, "testing", actUpdatedTask.Activity) } @@ -440,6 +449,9 @@ func TestMayWorkerRun(t *testing.T) { // Test: happy, task assigned to this worker. { + // Expect a 'touch' of the task. + mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &task).Return(nil) + echo := prepareRequest() task.WorkerID = &worker.ID err := mf.flamenco.MayWorkerRun(echo, task.UUID) diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 8993ff4a..8b54601f 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -7,6 +7,7 @@ import ( "database/sql/driver" "encoding/json" "errors" + "time" "gorm.io/gorm" @@ -43,8 +44,9 @@ type Task struct { Status api.TaskStatus `gorm:"type:varchar(16);default:''"` // Which worker is/was working on this. - WorkerID *uint - Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` + WorkerID *uint + Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` + LastTouchedAt time.Time // Dependencies are tasks that need to be completed before this one can run. Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"` @@ -385,3 +387,14 @@ func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job, } return nil } + +// TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection. +func (db *DB) TaskTouchedByWorker(ctx context.Context, t *Task) error { + tx := db.gormDB.WithContext(ctx). + Model(t). + Updates(Task{LastTouchedAt: db.gormDB.NowFunc()}) + if err := tx.Error; err != nil { + return taskError(err, "saving task 'last touched at'") + } + return nil +} diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 1a328172..4e2d2c31 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -213,6 +213,25 @@ func TestFetchTasksOfWorkerInStatus(t *testing.T) { assert.Empty(t, tasks, "worker should have no task in status %q", w) } +func TestTaskTouchedByWorker(t *testing.T) { + ctx, close, db, _, authoredJob := jobTasksTestFixtures(t) + defer close() + + task, err := db.FetchTask(ctx, authoredJob.Tasks[1].UUID) + assert.NoError(t, err) + assert.True(t, task.LastTouchedAt.IsZero()) + + now := db.gormDB.NowFunc() + err = db.TaskTouchedByWorker(ctx, task) + assert.NoError(t, err) + + // Test the task instance as well as the database entry. + dbTask, err := db.FetchTask(ctx, task.UUID) + assert.NoError(t, err) + assert.WithinDuration(t, now, task.LastTouchedAt, time.Second) + assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second) +} + func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { task1 := job_compilers.AuthoredTask{ Name: "render-1-3", diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index ac564c5e..5584c744 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -141,7 +141,6 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { } func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error { - // Without the Select() call, Gorm will try and also store task.Job in the - // jobs database, which is not what we want. - return tx.Model(t).Select("worker_id").Updates(Task{WorkerID: &w.ID}).Error + return tx.Model(t). + Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error } diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 7f6bdce5..a6f05451 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -51,6 +51,7 @@ func TestOneJobOneTask(t *testing.T) { assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker") // Check the task in the database. + now := db.gormDB.NowFunc() dbTask, err := db.FetchTask(context.Background(), authTask.UUID) assert.NoError(t, err) if dbTask == nil { @@ -60,6 +61,7 @@ func TestOneJobOneTask(t *testing.T) { t.Fatal("no worker assigned to task") } assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker") + assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second, "task must be 'touched' by the worker after scheduling") } func TestOneJobThreeTasksByPrio(t *testing.T) {