Manager: cleanup, refactor Worker state change request persistence code
Move the setting & clearing of worker state change requests into separate functions. No functional changes.
This commit is contained in:
parent
132ce8f2ec
commit
6cf82e5d43
@ -91,11 +91,9 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string)
|
|||||||
if dbWorker.Status == change.Status {
|
if dbWorker.Status == change.Status {
|
||||||
// Requesting that the worker should go to its current status basically
|
// Requesting that the worker should go to its current status basically
|
||||||
// means cancelling any previous status change request.
|
// means cancelling any previous status change request.
|
||||||
dbWorker.StatusRequested = ""
|
dbWorker.StatusChangeClear()
|
||||||
dbWorker.LazyStatusRequest = false
|
|
||||||
} else {
|
} else {
|
||||||
dbWorker.StatusRequested = change.Status
|
dbWorker.StatusChangeRequest(change.Status, change.IsLazy)
|
||||||
dbWorker.LazyStatusRequest = change.IsLazy
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the status change.
|
// Store the status change.
|
||||||
|
@ -25,7 +25,7 @@ func TestFetchWorkers(t *testing.T) {
|
|||||||
worker2.ID = 4
|
worker2.ID = 4
|
||||||
worker2.UUID = "f07b6d53-16ec-40a8-a7b4-a9cc8547f790"
|
worker2.UUID = "f07b6d53-16ec-40a8-a7b4-a9cc8547f790"
|
||||||
worker2.Status = api.WorkerStatusAwake
|
worker2.Status = api.WorkerStatusAwake
|
||||||
worker2.StatusRequested = api.WorkerStatusAsleep
|
worker2.StatusChangeRequest(api.WorkerStatusAsleep, false)
|
||||||
|
|
||||||
mf.persistence.EXPECT().FetchWorkers(gomock.Any()).
|
mf.persistence.EXPECT().FetchWorkers(gomock.Any()).
|
||||||
Return([]*persistence.Worker{&worker1, &worker2}, nil)
|
Return([]*persistence.Worker{&worker1, &worker2}, nil)
|
||||||
@ -104,7 +104,7 @@ func TestFetchWorker(t *testing.T) {
|
|||||||
|
|
||||||
// Test with worker that does have a status change requested.
|
// Test with worker that does have a status change requested.
|
||||||
requestedStatus := api.WorkerStatusAsleep
|
requestedStatus := api.WorkerStatusAsleep
|
||||||
worker.StatusRequested = requestedStatus
|
worker.StatusChangeRequest(requestedStatus, false)
|
||||||
mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil)
|
mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil)
|
||||||
|
|
||||||
echo = mf.prepareMockedRequest(nil)
|
echo = mf.prepareMockedRequest(nil)
|
||||||
@ -137,8 +137,7 @@ func TestRequestWorkerStatusChange(t *testing.T) {
|
|||||||
|
|
||||||
requestStatus := api.WorkerStatusAsleep
|
requestStatus := api.WorkerStatusAsleep
|
||||||
savedWorker := worker
|
savedWorker := worker
|
||||||
savedWorker.StatusRequested = requestStatus
|
savedWorker.StatusChangeRequest(requestStatus, true)
|
||||||
savedWorker.LazyStatusRequest = true
|
|
||||||
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &savedWorker).Return(nil)
|
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &savedWorker).Return(nil)
|
||||||
|
|
||||||
// Expect a broadcast of the change
|
// Expect a broadcast of the change
|
||||||
@ -171,8 +170,7 @@ func TestRequestWorkerStatusChangeRevert(t *testing.T) {
|
|||||||
worker := testWorker()
|
worker := testWorker()
|
||||||
|
|
||||||
// Mimick that a status change request to 'asleep' was already performed.
|
// Mimick that a status change request to 'asleep' was already performed.
|
||||||
worker.StatusRequested = api.WorkerStatusAsleep
|
worker.StatusChangeRequest(api.WorkerStatusAsleep, true)
|
||||||
worker.LazyStatusRequest = true
|
|
||||||
|
|
||||||
workerUUID := worker.UUID
|
workerUUID := worker.UUID
|
||||||
currentStatus := worker.Status
|
currentStatus := worker.Status
|
||||||
@ -183,8 +181,7 @@ func TestRequestWorkerStatusChangeRevert(t *testing.T) {
|
|||||||
// the previous status change request.
|
// the previous status change request.
|
||||||
requestStatus := currentStatus
|
requestStatus := currentStatus
|
||||||
savedWorker := worker
|
savedWorker := worker
|
||||||
savedWorker.StatusRequested = ""
|
savedWorker.StatusChangeClear()
|
||||||
savedWorker.LazyStatusRequest = false
|
|
||||||
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &savedWorker).Return(nil)
|
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &savedWorker).Return(nil)
|
||||||
|
|
||||||
// Expect a broadcast of the change
|
// Expect a broadcast of the change
|
||||||
|
@ -150,7 +150,7 @@ func (f *Flamenco) SignOff(e echo.Context) error {
|
|||||||
prevStatus := w.Status
|
prevStatus := w.Status
|
||||||
w.Status = api.WorkerStatusOffline
|
w.Status = api.WorkerStatusOffline
|
||||||
if w.StatusRequested == api.WorkerStatusOffline {
|
if w.StatusRequested == api.WorkerStatusOffline {
|
||||||
w.StatusRequested = ""
|
w.StatusChangeClear()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass a generic background context, as these changes should be stored even
|
// Pass a generic background context, as these changes should be stored even
|
||||||
@ -258,7 +258,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
|
|||||||
logger.Info().Msg("worker changed status")
|
logger.Info().Msg("worker changed status")
|
||||||
// Either there was no status change request (and this is a no-op) or the
|
// Either there was no status change request (and this is a no-op) or the
|
||||||
// status change was actually acknowledging the request.
|
// status change was actually acknowledging the request.
|
||||||
w.StatusRequested = ""
|
w.StatusChangeClear()
|
||||||
}
|
}
|
||||||
|
|
||||||
err = f.persist.SaveWorkerStatus(e.Request().Context(), w)
|
err = f.persist.SaveWorkerStatus(e.Request().Context(), w)
|
||||||
|
@ -75,7 +75,7 @@ func TestTaskScheduleOtherStatusRequested(t *testing.T) {
|
|||||||
|
|
||||||
mf := newMockedFlamenco(mockCtrl)
|
mf := newMockedFlamenco(mockCtrl)
|
||||||
worker := testWorker()
|
worker := testWorker()
|
||||||
worker.StatusRequested = api.WorkerStatusAsleep
|
worker.StatusChangeRequest(api.WorkerStatusAsleep, false)
|
||||||
|
|
||||||
// Explicitly NO expected calls to the persistence layer. Since the worker is
|
// Explicitly NO expected calls to the persistence layer. Since the worker is
|
||||||
// not in a state that allows task execution, there should be no DB queries.
|
// not in a state that allows task execution, there should be no DB queries.
|
||||||
@ -355,7 +355,7 @@ func TestMayWorkerRun(t *testing.T) {
|
|||||||
|
|
||||||
// Test: unhappy, assigned and runnable but worker should go to bed.
|
// Test: unhappy, assigned and runnable but worker should go to bed.
|
||||||
{
|
{
|
||||||
worker.StatusRequested = api.WorkerStatusAsleep
|
worker.StatusChangeRequest(api.WorkerStatusAsleep, false)
|
||||||
echo := prepareRequest()
|
echo := prepareRequest()
|
||||||
task.WorkerID = &worker.ID
|
task.WorkerID = &worker.ID
|
||||||
task.Status = api.TaskStatusActive
|
task.Status = api.TaskStatusActive
|
||||||
|
@ -34,6 +34,22 @@ func (w *Worker) TaskTypes() []string {
|
|||||||
return strings.Split(w.SupportedTaskTypes, ",")
|
return strings.Split(w.SupportedTaskTypes, ",")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatusChangeRequest stores a requested status change on the Worker.
|
||||||
|
// This just updates the Worker instance, but doesn't store the change in the
|
||||||
|
// database.
|
||||||
|
func (w *Worker) StatusChangeRequest(status api.WorkerStatus, isLazyRequest bool) {
|
||||||
|
w.StatusRequested = status
|
||||||
|
w.LazyStatusRequest = isLazyRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatusChangeClear clears the requested status change of the Worker.
|
||||||
|
// This just updates the Worker instance, but doesn't store the change in the
|
||||||
|
// database.
|
||||||
|
func (w *Worker) StatusChangeClear() {
|
||||||
|
w.StatusRequested = ""
|
||||||
|
w.LazyStatusRequest = false
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) CreateWorker(ctx context.Context, w *Worker) error {
|
func (db *DB) CreateWorker(ctx context.Context, w *Worker) error {
|
||||||
if err := db.gormDB.WithContext(ctx).Create(w).Error; err != nil {
|
if err := db.gormDB.WithContext(ctx).Create(w).Error; err != nil {
|
||||||
return fmt.Errorf("creating new worker: %w", err)
|
return fmt.Errorf("creating new worker: %w", err)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user