From 7d5aae25b5790dba1c65d3f8b4f74e82030287f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 13 Jun 2022 12:31:21 +0200 Subject: [PATCH] Manager: add timeout checks for workers --- cmd/flamenco-manager/main.go | 4 +- internal/manager/config/config.go | 4 +- internal/manager/config/defaults.go | 4 +- internal/manager/persistence/timeout.go | 13 +++++ internal/manager/persistence/workers.go | 10 ++-- .../manager/timeout_checker/interfaces.go | 2 + .../mocks/interfaces_mock.gen.go | 29 ++++++++++ .../manager/timeout_checker/tasks_test.go | 4 ++ .../timeout_checker/timeout_checker.go | 39 +++---------- .../timeout_checker/timeout_checker_test.go | 1 + internal/manager/timeout_checker/workers.go | 58 +++++++++++++++++++ .../manager/timeout_checker/workers_test.go | 53 +++++++++++++++++ 12 files changed, 180 insertions(+), 41 deletions(-) create mode 100644 internal/manager/timeout_checker/workers.go create mode 100644 internal/manager/timeout_checker/workers_test.go diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index b35483b2..f746d7fc 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -115,7 +115,9 @@ func main() { e := buildWebService(flamenco, persist, ssdp, webUpdater, urls) timeoutChecker := timeout_checker.New( - configService.Get().TaskTimeout, timeService, persist, taskStateMachine, logStorage) + configService.Get().TaskTimeout, + configService.Get().WorkerTimeout, + timeService, persist, taskStateMachine, logStorage) installSignalHandler(mainCtxCancel) diff --git a/internal/manager/config/config.go b/internal/manager/config/config.go index cecdb2c0..08532f54 100644 --- a/internal/manager/config/config.go +++ b/internal/manager/config/config.go @@ -84,8 +84,8 @@ type Base struct { // TLSCert string `yaml:"tlscert"` // ACMEDomainName string `yaml:"acme_domain_name"` // for the ACME Let's Encrypt client - TaskTimeout time.Duration `yaml:"task_timeout"` - // ActiveWorkerTimeoutInterval time.Duration `yaml:"active_worker_timeout_interval"` + TaskTimeout time.Duration `yaml:"task_timeout"` + WorkerTimeout time.Duration `yaml:"worker_timeout"` // WorkerCleanupMaxAge time.Duration `yaml:"worker_cleanup_max_age"` // WorkerCleanupStatus []string `yaml:"worker_cleanup_status"` diff --git a/internal/manager/config/defaults.go b/internal/manager/config/defaults.go index a32075f5..031444e4 100644 --- a/internal/manager/config/defaults.go +++ b/internal/manager/config/defaults.go @@ -32,8 +32,8 @@ var defaultConfig = Conf{ }, }, - TaskTimeout: 10 * time.Minute, - // ActiveWorkerTimeoutInterval: 1 * time.Minute, + TaskTimeout: 10 * time.Minute, + WorkerTimeout: 1 * time.Minute, // // Days are assumed to be 24 hours long. This is not exactly accurate, but should // // be accurate enough for this type of cleanup. diff --git a/internal/manager/persistence/timeout.go b/internal/manager/persistence/timeout.go index d1e96243..6d82481f 100644 --- a/internal/manager/persistence/timeout.go +++ b/internal/manager/persistence/timeout.go @@ -31,3 +31,16 @@ func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) } return result, nil } + +func (db *DB) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*Worker, error) { + result := []*Worker{} + tx := db.gormDB.WithContext(ctx). + Model(&Worker{}). + Where("workers.status = ?", api.WorkerStatusAwake). + Where("workers.last_seen_at <= ?", lastSeenBefore). + Scan(&result) + if tx.Error != nil { + return nil, workerError(tx.Error, "finding timed out workers (last seen before %s)", lastSeenBefore.String()) + } + return result, nil +} diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 3de39b46..f816e011 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "strings" + "time" "gorm.io/gorm" @@ -18,10 +19,11 @@ type Worker struct { Secret string `gorm:"type:varchar(255);default:''"` Name string `gorm:"type:varchar(64);default:''"` - Address string `gorm:"type:varchar(39);default:'';index"` // 39 = max length of IPv6 address. - Platform string `gorm:"type:varchar(16);default:''"` - Software string `gorm:"type:varchar(32);default:''"` - Status api.WorkerStatus `gorm:"type:varchar(16);default:''"` + Address string `gorm:"type:varchar(39);default:'';index"` // 39 = max length of IPv6 address. + Platform string `gorm:"type:varchar(16);default:''"` + Software string `gorm:"type:varchar(32);default:''"` + Status api.WorkerStatus `gorm:"type:varchar(16);default:''"` + LastSeenAt time.Time `gorm:"index"` // Should contain UTC timestamps. StatusRequested api.WorkerStatus `gorm:"type:varchar(16);default:''"` LazyStatusRequest bool `gorm:"type:smallint;default:0"` diff --git a/internal/manager/timeout_checker/interfaces.go b/internal/manager/timeout_checker/interfaces.go index 5c63e84a..4b7d4ce5 100644 --- a/internal/manager/timeout_checker/interfaces.go +++ b/internal/manager/timeout_checker/interfaces.go @@ -17,6 +17,8 @@ import ( type PersistenceService interface { FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error) + FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*persistence.Worker, error) + SaveWorker(ctx context.Context, w *persistence.Worker) error } var _ PersistenceService = (*persistence.DB)(nil) diff --git a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go index ee98f27b..7152148c 100644 --- a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go +++ b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go @@ -53,6 +53,35 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutTasks(arg0, arg1 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTimedOutTasks", reflect.TypeOf((*MockPersistenceService)(nil).FetchTimedOutTasks), arg0, arg1) } +// FetchTimedOutWorkers mocks base method. +func (m *MockPersistenceService) FetchTimedOutWorkers(arg0 context.Context, arg1 time.Time) ([]*persistence.Worker, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchTimedOutWorkers", arg0, arg1) + ret0, _ := ret[0].([]*persistence.Worker) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchTimedOutWorkers indicates an expected call of FetchTimedOutWorkers. +func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutWorkers(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTimedOutWorkers", reflect.TypeOf((*MockPersistenceService)(nil).FetchTimedOutWorkers), arg0, arg1) +} + +// SaveWorker mocks base method. +func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *persistence.Worker) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveWorker", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveWorker indicates an expected call of SaveWorker. +func (mr *MockPersistenceServiceMockRecorder) SaveWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorker", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorker), arg0, arg1) +} + // MockTaskStateMachine is a mock of TaskStateMachine interface. type MockTaskStateMachine struct { ctrl *gomock.Controller diff --git a/internal/manager/timeout_checker/tasks_test.go b/internal/manager/timeout_checker/tasks_test.go index af754bcf..4334a77e 100644 --- a/internal/manager/timeout_checker/tasks_test.go +++ b/internal/manager/timeout_checker/tasks_test.go @@ -36,6 +36,8 @@ func TestTimeoutCheckerTiming(t *testing.T) { initialTime.Add(timeoutInitialSleep - taskTimeout + 2*timeoutCheckInterval), } + mocks.persist.EXPECT().FetchTimedOutWorkers(mocks.ctx, gomock.Any()).AnyTimes().Return(nil, nil) + // Expect three fetches, one after the initial sleep time, and two a regular interval later. fetchTimes := make([]time.Time, len(deadlines)) firstCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[0]). @@ -122,6 +124,8 @@ func TestTaskTimeout(t *testing.T) { Worker: &worker, } + mocks.persist.EXPECT().FetchTimedOutWorkers(mocks.ctx, gomock.Any()).AnyTimes().Return(nil, nil) + mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()). Return([]*persistence.Task{&taskUnassigned, &taskUnknownWorker, &taskAssigned}, nil) diff --git a/internal/manager/timeout_checker/timeout_checker.go b/internal/manager/timeout_checker/timeout_checker.go index 2f8d9dc6..91b6fbba 100644 --- a/internal/manager/timeout_checker/timeout_checker.go +++ b/internal/manager/timeout_checker/timeout_checker.go @@ -19,7 +19,8 @@ const timeoutInitialSleep = 5 * time.Minute // TimeoutChecker periodically times out tasks and workers if the worker hasn't sent any update recently. type TimeoutChecker struct { - taskTimeout time.Duration + taskTimeout time.Duration + workerTimeout time.Duration clock clock.Clock persist PersistenceService @@ -29,14 +30,16 @@ type TimeoutChecker struct { // New creates a new TimeoutChecker. func New( - taskTimeout time.Duration, + taskTimeout, + workerTimeout time.Duration, clock clock.Clock, persist PersistenceService, taskStateMachine TaskStateMachine, logStorage LogStorage, ) *TimeoutChecker { return &TimeoutChecker{ - taskTimeout: taskTimeout, + taskTimeout: taskTimeout, + workerTimeout: workerTimeout, clock: clock, persist: persist, @@ -72,34 +75,6 @@ func (ttc *TimeoutChecker) Run(ctx context.Context) { waitDur = timeoutCheckInterval } ttc.checkTasks(ctx) - // ttc.checkWorkers(ctx) + ttc.checkWorkers(ctx) } } - -// func (ttc *TimeoutChecker) checkWorkers(db *mgo.Database) { -// timeoutThreshold := UtcNow().Add(-ttc.config.ActiveWorkerTimeoutInterval) -// log.Debugf("Failing all awake workers that have not been seen since %s", timeoutThreshold) - -// var timedoutWorkers []Worker -// // find all awake workers that either have never been seen, or were seen long ago. -// query := M{ -// "status": workerStatusAwake, -// "$or": []M{ -// M{"last_activity": M{"$lte": timeoutThreshold}}, -// M{"last_activity": M{"$exists": false}}, -// }, -// } -// projection := M{ -// "_id": 1, -// "nickname": 1, -// "address": 1, -// "status": 1, -// } -// if err := db.C("flamenco_workers").Find(query).Select(projection).All(&timedoutWorkers); err != nil { -// log.Warningf("Error finding timed-out workers: %s", err) -// } - -// for _, worker := range timedoutWorkers { -// worker.Timeout(db, ttc.scheduler) -// } -// } diff --git a/internal/manager/timeout_checker/timeout_checker_test.go b/internal/manager/timeout_checker/timeout_checker_test.go index b949d499..82dbffae 100644 --- a/internal/manager/timeout_checker/timeout_checker_test.go +++ b/internal/manager/timeout_checker/timeout_checker_test.go @@ -64,6 +64,7 @@ func timeoutCheckerTestFixtures(t *testing.T) (*TimeoutChecker, func(), *Timeout sm := New( taskTimeout, + workerTimeout, mocks.clock, mocks.persist, mocks.taskStateMachine, diff --git a/internal/manager/timeout_checker/workers.go b/internal/manager/timeout_checker/workers.go new file mode 100644 index 00000000..ab39513f --- /dev/null +++ b/internal/manager/timeout_checker/workers.go @@ -0,0 +1,58 @@ +package timeout_checker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" + "github.com/rs/zerolog/log" +) + +func (ttc *TimeoutChecker) checkWorkers(ctx context.Context) { + timeoutThreshold := ttc.clock.Now().UTC().Add(-ttc.workerTimeout) + logger := log.With(). + Time("threshold", timeoutThreshold.Local()). + Logger() + logger.Trace().Msg("TimeoutChecker: finding all awake workers that have not been seen since threshold") + + workers, err := ttc.persist.FetchTimedOutWorkers(ctx, timeoutThreshold) + if err != nil { + log.Error().Err(err).Msg("TimeoutChecker: error fetching timed-out workers from database") + return + } + + if len(workers) == 0 { + logger.Trace().Msg("TimeoutChecker: no timed-out workers") + return + } + logger.Debug(). + Int("numWorkers", len(workers)). + Msg("TimeoutChecker: failing all awake workers that have not been seen since threshold") + + for _, worker := range workers { + ttc.timeoutWorker(ctx, worker) + } +} + +// timeoutTask marks a task as 'failed' due to a timeout. +func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistence.Worker) { + logger := log.With(). + Str("worker", worker.UUID). + Str("name", worker.Name). + Str("lastSeenAt", worker.LastSeenAt.String()). + Logger() + logger.Warn().Msg("TimeoutChecker: worker timed out") + + worker.Status = api.WorkerStatusError + worker.StatusChangeClear() + + err := ttc.persist.SaveWorker(ctx, worker) + if err != nil { + logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out worker to database") + } + + // Re-queue all tasks assigned to this worker. + +} diff --git a/internal/manager/timeout_checker/workers_test.go b/internal/manager/timeout_checker/workers_test.go new file mode 100644 index 00000000..1d95edfa --- /dev/null +++ b/internal/manager/timeout_checker/workers_test.go @@ -0,0 +1,53 @@ +package timeout_checker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "testing" + "time" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" + "github.com/golang/mock/gomock" + "gorm.io/gorm" +) + +const workerTimeout = 20 * time.Minute + +func TestWorkerTimeout(t *testing.T) { + ttc, finish, mocks := timeoutCheckerTestFixtures(t) + defer finish() + + mocks.run(ttc) + + // Wait for the timeout checker to actually be sleeping, otherwise it could + // have a different sleep-start time than we expect. + time.Sleep(1 * time.Millisecond) + + lastSeenAt := mocks.clock.Now().UTC().Add(-1 * time.Hour) + + worker := persistence.Worker{ + UUID: "WORKER-UUID", + Name: "Tester", + Model: gorm.Model{ID: 47}, + LastSeenAt: lastSeenAt, + Status: api.WorkerStatusAsleep, + StatusRequested: api.WorkerStatusAwake, + } + + mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()).Return([]*persistence.Task{}, nil) + mocks.persist.EXPECT().FetchTimedOutWorkers(mocks.ctx, gomock.Any()). + Return([]*persistence.Worker{&worker}, nil) + + persistedWorker := worker + persistedWorker.Status = api.WorkerStatusError + // Any queued up status change should be cleared, as the Worker is not allowed + // to change into anything until this timeout has been address. + persistedWorker.StatusChangeClear() + mocks.persist.EXPECT().SaveWorker(mocks.ctx, &persistedWorker).Return(nil) + + // TODO: expect all tasks assigned to the worker to get requeued. + + // All the timeouts should be handled after the initial sleep. + mocks.clock.Add(timeoutInitialSleep) +}