Re-queue tasks of worker after changing to non-'awake' state

When a Worker changes state from `awake` to something else, it cannot
run tasks any more. This now triggers a requeue of its active task
(should be one at most, if things are sane) so that another worker can pick
it up.
This commit is contained in:
Sybren A. Stüvel 2022-07-19 15:38:36 +02:00
parent fb0c8e9317
commit 696b97c553
2 changed files with 16 additions and 9 deletions

View File

@ -241,16 +241,22 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
bgCtx, bgCtxCancel := bgContext()
defer bgCtxCancel()
err = f.persist.SaveWorkerStatus(bgCtx, w)
if err != nil {
if err := f.persist.SaveWorkerStatus(bgCtx, w); err != nil {
logger.Warn().Err(err).
Str("newStatus", string(w.Status)).
Msg("error storing Worker in database")
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
}
if err := f.workerSeen(logger, w); err != nil {
return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database")
// Any error has already been logged, and the rest of the code should also just run.
_ = f.workerSeen(logger, w)
// Re-queue all tasks (should be only one) this worker is now working on.
if prevStatus == api.WorkerStatusAwake && w.Status != api.WorkerStatusAwake {
err := f.stateMachine.RequeueActiveTasksOfWorker(bgCtx, w,
fmt.Sprintf("worker changed status to '%s'", w.Status))
if err != nil {
logger.Warn().Err(err).Msg("error re-queueing worker tasks after it changed to non-awake status")
}
}
update := webupdates.NewWorkerUpdate(w)

View File

@ -316,7 +316,7 @@ func TestWorkerStateChanged(t *testing.T) {
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
worker.Status = api.WorkerStatusStarting
worker.Status = api.WorkerStatusAwake
prevStatus := worker.Status
// Expect a broadcast of the change
@ -324,20 +324,21 @@ func TestWorkerStateChanged(t *testing.T) {
Id: worker.UUID,
Name: worker.Name,
PreviousStatus: &prevStatus,
Status: api.WorkerStatusAwake,
Status: api.WorkerStatusAsleep,
Updated: worker.UpdatedAt,
Version: worker.Software,
})
// Expect the Worker to be saved with the new status
savedWorker := worker
savedWorker.Status = api.WorkerStatusAwake
savedWorker.Status = api.WorkerStatusAsleep
mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
mf.stateMachine.EXPECT().RequeueActiveTasksOfWorker(gomock.Any(), &worker, "worker changed status to 'asleep'")
// Perform the request
echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
Status: api.WorkerStatusAwake,
Status: api.WorkerStatusAsleep,
})
requestWorkerStore(echo, &worker)
err := mf.flamenco.WorkerStateChanged(echo)