Manager: mark workers as 'seen' when they send updates
Update the 'last seen at' timestamp of workers when they: - sign on - sign off - get a task assigned - send a task update - check whether they can keep running their task Note that this commit is necessary to not have the workers time out immediately ;-)
This commit is contained in:
parent
986b647967
commit
5dac3c2dc0
@ -40,6 +40,7 @@ type PersistenceService interface {
|
|||||||
FetchWorkers(ctx context.Context) ([]*persistence.Worker, error)
|
FetchWorkers(ctx context.Context) ([]*persistence.Worker, error)
|
||||||
SaveWorker(ctx context.Context, w *persistence.Worker) error
|
SaveWorker(ctx context.Context, w *persistence.Worker) error
|
||||||
SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error
|
SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error
|
||||||
|
WorkerSeen(ctx context.Context, w *persistence.Worker) error
|
||||||
|
|
||||||
// ScheduleTask finds a task to execute by the given worker, and assigns it to that worker.
|
// ScheduleTask finds a task to execute by the given worker, and assigns it to that worker.
|
||||||
// If no task is available, (nil, nil) is returned, as this is not an error situation.
|
// If no task is available, (nil, nil) is returned, as this is not an error situation.
|
||||||
|
@ -243,6 +243,20 @@ func (mr *MockPersistenceServiceMockRecorder) TaskTouchedByWorker(arg0, arg1 int
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskTouchedByWorker", reflect.TypeOf((*MockPersistenceService)(nil).TaskTouchedByWorker), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskTouchedByWorker", reflect.TypeOf((*MockPersistenceService)(nil).TaskTouchedByWorker), arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WorkerSeen mocks base method.
|
||||||
|
func (m *MockPersistenceService) WorkerSeen(arg0 context.Context, arg1 *persistence.Worker) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "WorkerSeen", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// WorkerSeen indicates an expected call of WorkerSeen.
|
||||||
|
func (mr *MockPersistenceServiceMockRecorder) WorkerSeen(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSeen", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSeen), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
|
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
|
||||||
type MockChangeBroadcaster struct {
|
type MockChangeBroadcaster struct {
|
||||||
ctrl *gomock.Controller
|
ctrl *gomock.Controller
|
||||||
|
@ -109,6 +109,7 @@ func (f *Flamenco) SignOn(e echo.Context) error {
|
|||||||
func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, api.WorkerStatus, error) {
|
func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, api.WorkerStatus, error) {
|
||||||
logger := requestLogger(e)
|
logger := requestLogger(e)
|
||||||
w := requestWorkerOrPanic(e)
|
w := requestWorkerOrPanic(e)
|
||||||
|
ctx := e.Request().Context()
|
||||||
|
|
||||||
// Update the worker for with the new sign-on info.
|
// Update the worker for with the new sign-on info.
|
||||||
prevStatus := w.Status
|
prevStatus := w.Status
|
||||||
@ -124,7 +125,7 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON
|
|||||||
w.SupportedTaskTypes = strings.Join(update.SupportedTaskTypes, ",")
|
w.SupportedTaskTypes = strings.Join(update.SupportedTaskTypes, ",")
|
||||||
|
|
||||||
// Save the new Worker info to the database.
|
// Save the new Worker info to the database.
|
||||||
err := f.persist.SaveWorker(e.Request().Context(), w)
|
err := f.persist.SaveWorker(ctx, w)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn().Err(err).
|
logger.Warn().Err(err).
|
||||||
Str("newStatus", string(w.Status)).
|
Str("newStatus", string(w.Status)).
|
||||||
@ -132,6 +133,11 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON
|
|||||||
return nil, "", err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = f.workerSeen(ctx, logger, w)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
return w, prevStatus, nil
|
return w, prevStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,6 +173,9 @@ func (f *Flamenco) SignOff(e echo.Context) error {
|
|||||||
return sendAPIError(e, http.StatusInternalServerError, "error storing new status in database")
|
return sendAPIError(e, http.StatusInternalServerError, "error storing new status in database")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ignore database errors here; the rest of the signoff process should just happen.
|
||||||
|
_ = f.workerSeen(ctx, logger, w)
|
||||||
|
|
||||||
// Re-queue all tasks (should be only one) this worker is now working on.
|
// Re-queue all tasks (should be only one) this worker is now working on.
|
||||||
err = f.stateMachine.RequeueTasksOfWorker(ctx, w, "worker signed off")
|
err = f.stateMachine.RequeueTasksOfWorker(ctx, w, "worker signed off")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -224,7 +233,8 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
|
|||||||
w.StatusChangeClear()
|
w.StatusChangeClear()
|
||||||
}
|
}
|
||||||
|
|
||||||
err = f.persist.SaveWorkerStatus(e.Request().Context(), w)
|
ctx := e.Request().Context()
|
||||||
|
err = f.persist.SaveWorkerStatus(ctx, w)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn().Err(err).
|
logger.Warn().Err(err).
|
||||||
Str("newStatus", string(w.Status)).
|
Str("newStatus", string(w.Status)).
|
||||||
@ -232,6 +242,10 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
|
|||||||
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
|
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := f.workerSeen(ctx, logger, w); err != nil {
|
||||||
|
return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database")
|
||||||
|
}
|
||||||
|
|
||||||
update := webupdates.NewWorkerUpdate(w)
|
update := webupdates.NewWorkerUpdate(w)
|
||||||
update.PreviousStatus = &prevStatus
|
update.PreviousStatus = &prevStatus
|
||||||
f.broadcaster.BroadcastWorkerUpdate(update)
|
f.broadcaster.BroadcastWorkerUpdate(update)
|
||||||
@ -288,9 +302,13 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start timeout measurement as soon as the Worker gets the task assigned.
|
// Start timeout measurement as soon as the Worker gets the task assigned.
|
||||||
if err := f.workerPingedTask(e.Request().Context(), logger, dbTask); err != nil {
|
ctx := e.Request().Context()
|
||||||
|
if err := f.workerPingedTask(ctx, logger, dbTask); err != nil {
|
||||||
return sendAPIError(e, http.StatusInternalServerError, "internal error updating task for timeout calculation: %v", err)
|
return sendAPIError(e, http.StatusInternalServerError, "internal error updating task for timeout calculation: %v", err)
|
||||||
}
|
}
|
||||||
|
if err := f.workerSeen(ctx, logger, worker); err != nil {
|
||||||
|
return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database")
|
||||||
|
}
|
||||||
|
|
||||||
// Convert database objects to API objects:
|
// Convert database objects to API objects:
|
||||||
apiCommands := []api.Command{}
|
apiCommands := []api.Command{}
|
||||||
@ -361,6 +379,7 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
|
|||||||
|
|
||||||
taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate)
|
taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate)
|
||||||
workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask)
|
workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask)
|
||||||
|
workerSeenErr := f.workerSeen(ctx, logger, worker)
|
||||||
|
|
||||||
if taskUpdateErr != nil {
|
if taskUpdateErr != nil {
|
||||||
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)
|
||||||
@ -368,6 +387,9 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
|
|||||||
if workerUpdateErr != nil {
|
if workerUpdateErr != nil {
|
||||||
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr)
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr)
|
||||||
}
|
}
|
||||||
|
if workerSeenErr != nil {
|
||||||
|
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker 'last seen' update: %v", workerSeenErr)
|
||||||
|
}
|
||||||
|
|
||||||
return e.NoContent(http.StatusNoContent)
|
return e.NoContent(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
@ -431,6 +453,20 @@ func (f *Flamenco) workerPingedTask(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// workerSeen marks the worker as 'seen' and logs any database error that may occur.
|
||||||
|
func (f *Flamenco) workerSeen(
|
||||||
|
ctx context.Context,
|
||||||
|
logger zerolog.Logger,
|
||||||
|
w *persistence.Worker,
|
||||||
|
) error {
|
||||||
|
err := f.persist.WorkerSeen(ctx, w)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error().Err(err).Msg("error marking Worker as 'seen' in the database")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {
|
func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {
|
||||||
logger := requestLogger(e)
|
logger := requestLogger(e)
|
||||||
worker := requestWorkerOrPanic(e)
|
worker := requestWorkerOrPanic(e)
|
||||||
@ -462,11 +498,12 @@ func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {
|
|||||||
|
|
||||||
mkr := mayWorkerRun(worker, dbTask)
|
mkr := mayWorkerRun(worker, dbTask)
|
||||||
|
|
||||||
|
// Errors saving the "worker pinged task" and "worker seen" fields in the
|
||||||
|
// database are just logged. It's not something to bother the worker with.
|
||||||
if mkr.MayKeepRunning {
|
if mkr.MayKeepRunning {
|
||||||
// Errors saving the "worker pinged task" field in the database are just
|
|
||||||
// logged. It's not something to bother the worker with.
|
|
||||||
_ = f.workerPingedTask(ctx, logger, dbTask)
|
_ = f.workerPingedTask(ctx, logger, dbTask)
|
||||||
}
|
}
|
||||||
|
_ = f.workerSeen(ctx, logger, worker)
|
||||||
|
|
||||||
return e.JSON(http.StatusOK, mkr)
|
return e.JSON(http.StatusOK, mkr)
|
||||||
}
|
}
|
||||||
|
@ -33,8 +33,11 @@ func TestTaskScheduleHappy(t *testing.T) {
|
|||||||
UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503",
|
UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503",
|
||||||
Job: &job,
|
Job: &job,
|
||||||
}
|
}
|
||||||
mf.persistence.EXPECT().ScheduleTask(echo.Request().Context(), &worker).Return(&task, nil)
|
|
||||||
mf.persistence.EXPECT().TaskTouchedByWorker(echo.Request().Context(), &task)
|
ctx := echo.Request().Context()
|
||||||
|
mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(&task, nil)
|
||||||
|
mf.persistence.EXPECT().TaskTouchedByWorker(ctx, &task)
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(ctx, &worker)
|
||||||
|
|
||||||
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task.UUID,
|
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task.UUID,
|
||||||
"Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)")
|
"Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)")
|
||||||
@ -112,6 +115,7 @@ func TestWorkerSignOn(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &worker).Return(nil)
|
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &worker).Return(nil)
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
|
||||||
|
|
||||||
echo := mf.prepareMockedJSONRequest(api.WorkerSignOn{
|
echo := mf.prepareMockedJSONRequest(api.WorkerSignOn{
|
||||||
Nickname: "Lazy Boi",
|
Nickname: "Lazy Boi",
|
||||||
@ -142,6 +146,7 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
|
|||||||
|
|
||||||
// Expect worker's tasks to be re-queued.
|
// Expect worker's tasks to be re-queued.
|
||||||
mf.stateMachine.EXPECT().RequeueTasksOfWorker(expectCtx, &worker, "worker signed off").Return(nil)
|
mf.stateMachine.EXPECT().RequeueTasksOfWorker(expectCtx, &worker, "worker signed off").Return(nil)
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(expectCtx, &worker)
|
||||||
|
|
||||||
// Expect worker to be saved as 'offline'.
|
// Expect worker to be saved as 'offline'.
|
||||||
mf.persistence.EXPECT().
|
mf.persistence.EXPECT().
|
||||||
@ -193,6 +198,7 @@ func TestWorkerSignoffStatusChangeRequest(t *testing.T) {
|
|||||||
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
||||||
|
|
||||||
mf.stateMachine.EXPECT().RequeueTasksOfWorker(gomock.Any(), &worker, "worker signed off").Return(nil)
|
mf.stateMachine.EXPECT().RequeueTasksOfWorker(gomock.Any(), &worker, "worker signed off").Return(nil)
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
echo := mf.prepareMockedRequest(nil)
|
echo := mf.prepareMockedRequest(nil)
|
||||||
@ -225,6 +231,7 @@ func TestWorkerStateChanged(t *testing.T) {
|
|||||||
savedWorker := worker
|
savedWorker := worker
|
||||||
savedWorker.Status = api.WorkerStatusAwake
|
savedWorker.Status = api.WorkerStatusAwake
|
||||||
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
|
echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
|
||||||
@ -267,6 +274,7 @@ func TestWorkerStateChangedAfterChangeRequest(t *testing.T) {
|
|||||||
savedWorker := worker
|
savedWorker := worker
|
||||||
savedWorker.Status = api.WorkerStatusStarting
|
savedWorker.Status = api.WorkerStatusStarting
|
||||||
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
|
echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
|
||||||
@ -296,6 +304,7 @@ func TestWorkerStateChangedAfterChangeRequest(t *testing.T) {
|
|||||||
savedWorker.Status = api.WorkerStatusAsleep
|
savedWorker.Status = api.WorkerStatusAsleep
|
||||||
savedWorker.StatusChangeClear()
|
savedWorker.StatusChangeClear()
|
||||||
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
|
echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
|
||||||
@ -363,6 +372,7 @@ func TestTaskUpdate(t *testing.T) {
|
|||||||
touchedTask = *task
|
touchedTask = *task
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
|
||||||
|
|
||||||
// Do the call.
|
// Do the call.
|
||||||
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
|
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
|
||||||
@ -403,6 +413,11 @@ func TestMayWorkerRun(t *testing.T) {
|
|||||||
|
|
||||||
mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil).AnyTimes()
|
mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil).AnyTimes()
|
||||||
|
|
||||||
|
// Expect the worker to be marked as 'seen' regardless of whether it may run
|
||||||
|
// its current task or not, so equal to the number of calls to
|
||||||
|
// `MayWorkerRun()` below.
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(4)
|
||||||
|
|
||||||
// Test: unhappy, task unassigned
|
// Test: unhappy, task unassigned
|
||||||
{
|
{
|
||||||
echo := prepareRequest()
|
echo := prepareRequest()
|
||||||
|
@ -102,3 +102,14 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WorkerSeen marks the worker as 'seen' by this Manager. This is used for timeout detection.
|
||||||
|
func (db *DB) WorkerSeen(ctx context.Context, w *Worker) error {
|
||||||
|
tx := db.gormDB.WithContext(ctx).
|
||||||
|
Model(w).
|
||||||
|
Updates(Worker{LastSeenAt: db.gormDB.NowFunc()})
|
||||||
|
if err := tx.Error; err != nil {
|
||||||
|
return workerError(err, "saving worker 'last seen at'")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user