diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index ddb86a25..11f9378b 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -40,6 +40,7 @@ type PersistenceService interface { FetchWorkers(ctx context.Context) ([]*persistence.Worker, error) SaveWorker(ctx context.Context, w *persistence.Worker) error SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error + WorkerSeen(ctx context.Context, w *persistence.Worker) error // ScheduleTask finds a task to execute by the given worker, and assigns it to that worker. // If no task is available, (nil, nil) is returned, as this is not an error situation. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 5bf95a31..3a569cd4 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -243,6 +243,20 @@ func (mr *MockPersistenceServiceMockRecorder) TaskTouchedByWorker(arg0, arg1 int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskTouchedByWorker", reflect.TypeOf((*MockPersistenceService)(nil).TaskTouchedByWorker), arg0, arg1) } +// WorkerSeen mocks base method. +func (m *MockPersistenceService) WorkerSeen(arg0 context.Context, arg1 *persistence.Worker) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WorkerSeen", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// WorkerSeen indicates an expected call of WorkerSeen. +func (mr *MockPersistenceServiceMockRecorder) WorkerSeen(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSeen", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSeen), arg0, arg1) +} + // MockChangeBroadcaster is a mock of ChangeBroadcaster interface. type MockChangeBroadcaster struct { ctrl *gomock.Controller diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index ed536f16..ed7c0e6f 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -109,6 +109,7 @@ func (f *Flamenco) SignOn(e echo.Context) error { func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, api.WorkerStatus, error) { logger := requestLogger(e) w := requestWorkerOrPanic(e) + ctx := e.Request().Context() // Update the worker for with the new sign-on info. prevStatus := w.Status @@ -124,7 +125,7 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON w.SupportedTaskTypes = strings.Join(update.SupportedTaskTypes, ",") // Save the new Worker info to the database. - err := f.persist.SaveWorker(e.Request().Context(), w) + err := f.persist.SaveWorker(ctx, w) if err != nil { logger.Warn().Err(err). Str("newStatus", string(w.Status)). @@ -132,6 +133,11 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON return nil, "", err } + err = f.workerSeen(ctx, logger, w) + if err != nil { + return nil, "", err + } + return w, prevStatus, nil } @@ -167,6 +173,9 @@ func (f *Flamenco) SignOff(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error storing new status in database") } + // Ignore database errors here; the rest of the signoff process should just happen. + _ = f.workerSeen(ctx, logger, w) + // Re-queue all tasks (should be only one) this worker is now working on. err = f.stateMachine.RequeueTasksOfWorker(ctx, w, "worker signed off") if err != nil { @@ -224,7 +233,8 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { w.StatusChangeClear() } - err = f.persist.SaveWorkerStatus(e.Request().Context(), w) + ctx := e.Request().Context() + err = f.persist.SaveWorkerStatus(ctx, w) if err != nil { logger.Warn().Err(err). Str("newStatus", string(w.Status)). @@ -232,6 +242,10 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") } + if err := f.workerSeen(ctx, logger, w); err != nil { + return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database") + } + update := webupdates.NewWorkerUpdate(w) update.PreviousStatus = &prevStatus f.broadcaster.BroadcastWorkerUpdate(update) @@ -288,9 +302,13 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { } // Start timeout measurement as soon as the Worker gets the task assigned. - if err := f.workerPingedTask(e.Request().Context(), logger, dbTask); err != nil { + ctx := e.Request().Context() + if err := f.workerPingedTask(ctx, logger, dbTask); err != nil { return sendAPIError(e, http.StatusInternalServerError, "internal error updating task for timeout calculation: %v", err) } + if err := f.workerSeen(ctx, logger, worker); err != nil { + return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database") + } // Convert database objects to API objects: apiCommands := []api.Command{} @@ -361,6 +379,7 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate) workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask) + workerSeenErr := f.workerSeen(ctx, logger, worker) if taskUpdateErr != nil { return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr) @@ -368,6 +387,9 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { if workerUpdateErr != nil { return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr) } + if workerSeenErr != nil { + return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr) + } return e.NoContent(http.StatusNoContent) } @@ -431,6 +453,20 @@ func (f *Flamenco) workerPingedTask( return nil } +// workerSeen marks the worker as 'seen' and logs any database error that may occur. +func (f *Flamenco) workerSeen( + ctx context.Context, + logger zerolog.Logger, + w *persistence.Worker, +) error { + err := f.persist.WorkerSeen(ctx, w) + if err != nil { + logger.Error().Err(err).Msg("error marking Worker as 'seen' in the database") + return err + } + return nil +} + func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { logger := requestLogger(e) worker := requestWorkerOrPanic(e) @@ -462,11 +498,12 @@ func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { mkr := mayWorkerRun(worker, dbTask) + // Errors saving the "worker pinged task" and "worker seen" fields in the + // database are just logged. It's not something to bother the worker with. if mkr.MayKeepRunning { - // Errors saving the "worker pinged task" field in the database are just - // logged. It's not something to bother the worker with. _ = f.workerPingedTask(ctx, logger, dbTask) } + _ = f.workerSeen(ctx, logger, worker) return e.JSON(http.StatusOK, mkr) } diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index dcdcc9f1..eae3bb76 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -33,8 +33,11 @@ func TestTaskScheduleHappy(t *testing.T) { UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503", Job: &job, } - mf.persistence.EXPECT().ScheduleTask(echo.Request().Context(), &worker).Return(&task, nil) - mf.persistence.EXPECT().TaskTouchedByWorker(echo.Request().Context(), &task) + + ctx := echo.Request().Context() + mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(&task, nil) + mf.persistence.EXPECT().TaskTouchedByWorker(ctx, &task) + mf.persistence.EXPECT().WorkerSeen(ctx, &worker) mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task.UUID, "Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)") @@ -112,6 +115,7 @@ func TestWorkerSignOn(t *testing.T) { }) mf.persistence.EXPECT().SaveWorker(gomock.Any(), &worker).Return(nil) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) echo := mf.prepareMockedJSONRequest(api.WorkerSignOn{ Nickname: "Lazy Boi", @@ -142,6 +146,7 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { // Expect worker's tasks to be re-queued. mf.stateMachine.EXPECT().RequeueTasksOfWorker(expectCtx, &worker, "worker signed off").Return(nil) + mf.persistence.EXPECT().WorkerSeen(expectCtx, &worker) // Expect worker to be saved as 'offline'. mf.persistence.EXPECT(). @@ -193,6 +198,7 @@ func TestWorkerSignoffStatusChangeRequest(t *testing.T) { mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil) mf.stateMachine.EXPECT().RequeueTasksOfWorker(gomock.Any(), &worker, "worker signed off").Return(nil) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) // Perform the request echo := mf.prepareMockedRequest(nil) @@ -225,6 +231,7 @@ func TestWorkerStateChanged(t *testing.T) { savedWorker := worker savedWorker.Status = api.WorkerStatusAwake mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) // Perform the request echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{ @@ -267,6 +274,7 @@ func TestWorkerStateChangedAfterChangeRequest(t *testing.T) { savedWorker := worker savedWorker.Status = api.WorkerStatusStarting mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) // Perform the request echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{ @@ -296,6 +304,7 @@ func TestWorkerStateChangedAfterChangeRequest(t *testing.T) { savedWorker.Status = api.WorkerStatusAsleep savedWorker.StatusChangeClear() mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) // Perform the request echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{ @@ -363,6 +372,7 @@ func TestTaskUpdate(t *testing.T) { touchedTask = *task return nil }) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) // Do the call. echoCtx := mf.prepareMockedJSONRequest(taskUpdate) @@ -403,6 +413,11 @@ func TestMayWorkerRun(t *testing.T) { mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil).AnyTimes() + // Expect the worker to be marked as 'seen' regardless of whether it may run + // its current task or not, so equal to the number of calls to + // `MayWorkerRun()` below. + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(4) + // Test: unhappy, task unassigned { echo := prepareRequest() diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index f816e011..b6258f83 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -102,3 +102,14 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { } return nil } + +// WorkerSeen marks the worker as 'seen' by this Manager. This is used for timeout detection. +func (db *DB) WorkerSeen(ctx context.Context, w *Worker) error { + tx := db.gormDB.WithContext(ctx). + Model(w). + Updates(Worker{LastSeenAt: db.gormDB.NowFunc()}) + if err := tx.Error; err != nil { + return workerError(err, "saving worker 'last seen at'") + } + return nil +}