Manager: task scheduler, check for requested worker status change first
Before checking whether the Worker is allowed to do work (i.e. is in `awake` state), check any queued-up status changes. Those should be communicated, before saying "no work for you", so that the Worker can actually respond to it.
This commit is contained in:
parent
ee53373878
commit
5f2712980e
@ -272,15 +272,6 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check that this worker is actually allowed to do work.
|
// Check that this worker is actually allowed to do work.
|
||||||
requiredStatusToGetTask := api.WorkerStatusAwake
|
|
||||||
if worker.Status != requiredStatusToGetTask {
|
|
||||||
logger.Warn().
|
|
||||||
Str("workerStatus", string(worker.Status)).
|
|
||||||
Str("requiredStatus", string(requiredStatusToGetTask)).
|
|
||||||
Msg("worker asking for task but is in wrong state")
|
|
||||||
return sendAPIError(e, http.StatusConflict,
|
|
||||||
fmt.Sprintf("worker is in state %q, requires state %q to execute tasks", worker.Status, requiredStatusToGetTask))
|
|
||||||
}
|
|
||||||
if worker.StatusRequested != "" {
|
if worker.StatusRequested != "" {
|
||||||
logger.Warn().
|
logger.Warn().
|
||||||
Str("workerStatus", string(worker.Status)).
|
Str("workerStatus", string(worker.Status)).
|
||||||
@ -291,6 +282,16 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
requiredStatusToGetTask := api.WorkerStatusAwake
|
||||||
|
if worker.Status != requiredStatusToGetTask {
|
||||||
|
logger.Warn().
|
||||||
|
Str("workerStatus", string(worker.Status)).
|
||||||
|
Str("requiredStatus", string(requiredStatusToGetTask)).
|
||||||
|
Msg("worker asking for task but is in wrong state")
|
||||||
|
return sendAPIError(e, http.StatusConflict,
|
||||||
|
fmt.Sprintf("worker is in state %q, requires state %q to execute tasks", worker.Status, requiredStatusToGetTask))
|
||||||
|
}
|
||||||
|
|
||||||
// Get a task to execute:
|
// Get a task to execute:
|
||||||
dbTask, err := f.persist.ScheduleTask(e.Request().Context(), worker)
|
dbTask, err := f.persist.ScheduleTask(e.Request().Context(), worker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -124,6 +124,32 @@ func TestTaskScheduleOtherStatusRequested(t *testing.T) {
|
|||||||
assertResponseJSON(t, echoCtx, http.StatusLocked, expectBody)
|
assertResponseJSON(t, echoCtx, http.StatusLocked, expectBody)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTaskScheduleOtherStatusRequestedAndBadState(t *testing.T) {
|
||||||
|
mockCtrl := gomock.NewController(t)
|
||||||
|
defer mockCtrl.Finish()
|
||||||
|
|
||||||
|
mf := newMockedFlamenco(mockCtrl)
|
||||||
|
worker := testWorker()
|
||||||
|
|
||||||
|
// Even when the worker is in a state that doesn't allow execution, if there
|
||||||
|
// is a status change requested, this should be communicated to the worker.
|
||||||
|
worker.Status = api.WorkerStatusError
|
||||||
|
worker.StatusChangeRequest(api.WorkerStatusAwake, false)
|
||||||
|
|
||||||
|
echoCtx := mf.prepareMockedRequest(nil)
|
||||||
|
requestWorkerStore(echoCtx, &worker)
|
||||||
|
|
||||||
|
// The worker should be marked as 'seen', even when it's in a state that
|
||||||
|
// doesn't allow task execution.
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(echoCtx.Request().Context(), &worker)
|
||||||
|
|
||||||
|
err := mf.flamenco.ScheduleTask(echoCtx)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expectBody := api.WorkerStateChange{StatusRequested: api.WorkerStatusAwake}
|
||||||
|
assertResponseJSON(t, echoCtx, http.StatusLocked, expectBody)
|
||||||
|
}
|
||||||
|
|
||||||
func TestWorkerSignOn(t *testing.T) {
|
func TestWorkerSignOn(t *testing.T) {
|
||||||
mockCtrl := gomock.NewController(t)
|
mockCtrl := gomock.NewController(t)
|
||||||
defer mockCtrl.Finish()
|
defer mockCtrl.Finish()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user