Manager: add timeout checks for workers
This commit is contained in:
parent
e8171fc597
commit
7d5aae25b5
@ -115,7 +115,9 @@ func main() {
|
||||
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls)
|
||||
|
||||
timeoutChecker := timeout_checker.New(
|
||||
configService.Get().TaskTimeout, timeService, persist, taskStateMachine, logStorage)
|
||||
configService.Get().TaskTimeout,
|
||||
configService.Get().WorkerTimeout,
|
||||
timeService, persist, taskStateMachine, logStorage)
|
||||
|
||||
installSignalHandler(mainCtxCancel)
|
||||
|
||||
|
@ -85,7 +85,7 @@ type Base struct {
|
||||
// ACMEDomainName string `yaml:"acme_domain_name"` // for the ACME Let's Encrypt client
|
||||
|
||||
TaskTimeout time.Duration `yaml:"task_timeout"`
|
||||
// ActiveWorkerTimeoutInterval time.Duration `yaml:"active_worker_timeout_interval"`
|
||||
WorkerTimeout time.Duration `yaml:"worker_timeout"`
|
||||
|
||||
// WorkerCleanupMaxAge time.Duration `yaml:"worker_cleanup_max_age"`
|
||||
// WorkerCleanupStatus []string `yaml:"worker_cleanup_status"`
|
||||
|
@ -33,7 +33,7 @@ var defaultConfig = Conf{
|
||||
},
|
||||
|
||||
TaskTimeout: 10 * time.Minute,
|
||||
// ActiveWorkerTimeoutInterval: 1 * time.Minute,
|
||||
WorkerTimeout: 1 * time.Minute,
|
||||
|
||||
// // Days are assumed to be 24 hours long. This is not exactly accurate, but should
|
||||
// // be accurate enough for this type of cleanup.
|
||||
|
@ -31,3 +31,16 @@ func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*Worker, error) {
|
||||
result := []*Worker{}
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Model(&Worker{}).
|
||||
Where("workers.status = ?", api.WorkerStatusAwake).
|
||||
Where("workers.last_seen_at <= ?", lastSeenBefore).
|
||||
Scan(&result)
|
||||
if tx.Error != nil {
|
||||
return nil, workerError(tx.Error, "finding timed out workers (last seen before %s)", lastSeenBefore.String())
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
||||
@ -22,6 +23,7 @@ type Worker struct {
|
||||
Platform string `gorm:"type:varchar(16);default:''"`
|
||||
Software string `gorm:"type:varchar(32);default:''"`
|
||||
Status api.WorkerStatus `gorm:"type:varchar(16);default:''"`
|
||||
LastSeenAt time.Time `gorm:"index"` // Should contain UTC timestamps.
|
||||
|
||||
StatusRequested api.WorkerStatus `gorm:"type:varchar(16);default:''"`
|
||||
LazyStatusRequest bool `gorm:"type:smallint;default:0"`
|
||||
|
@ -17,6 +17,8 @@ import (
|
||||
|
||||
type PersistenceService interface {
|
||||
FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error)
|
||||
FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*persistence.Worker, error)
|
||||
SaveWorker(ctx context.Context, w *persistence.Worker) error
|
||||
}
|
||||
|
||||
var _ PersistenceService = (*persistence.DB)(nil)
|
||||
|
@ -53,6 +53,35 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutTasks(arg0, arg1 inte
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTimedOutTasks", reflect.TypeOf((*MockPersistenceService)(nil).FetchTimedOutTasks), arg0, arg1)
|
||||
}
|
||||
|
||||
// FetchTimedOutWorkers mocks base method.
|
||||
func (m *MockPersistenceService) FetchTimedOutWorkers(arg0 context.Context, arg1 time.Time) ([]*persistence.Worker, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "FetchTimedOutWorkers", arg0, arg1)
|
||||
ret0, _ := ret[0].([]*persistence.Worker)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// FetchTimedOutWorkers indicates an expected call of FetchTimedOutWorkers.
|
||||
func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutWorkers(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTimedOutWorkers", reflect.TypeOf((*MockPersistenceService)(nil).FetchTimedOutWorkers), arg0, arg1)
|
||||
}
|
||||
|
||||
// SaveWorker mocks base method.
|
||||
func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *persistence.Worker) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SaveWorker", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SaveWorker indicates an expected call of SaveWorker.
|
||||
func (mr *MockPersistenceServiceMockRecorder) SaveWorker(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorker", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorker), arg0, arg1)
|
||||
}
|
||||
|
||||
// MockTaskStateMachine is a mock of TaskStateMachine interface.
|
||||
type MockTaskStateMachine struct {
|
||||
ctrl *gomock.Controller
|
||||
|
@ -36,6 +36,8 @@ func TestTimeoutCheckerTiming(t *testing.T) {
|
||||
initialTime.Add(timeoutInitialSleep - taskTimeout + 2*timeoutCheckInterval),
|
||||
}
|
||||
|
||||
mocks.persist.EXPECT().FetchTimedOutWorkers(mocks.ctx, gomock.Any()).AnyTimes().Return(nil, nil)
|
||||
|
||||
// Expect three fetches, one after the initial sleep time, and two a regular interval later.
|
||||
fetchTimes := make([]time.Time, len(deadlines))
|
||||
firstCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[0]).
|
||||
@ -122,6 +124,8 @@ func TestTaskTimeout(t *testing.T) {
|
||||
Worker: &worker,
|
||||
}
|
||||
|
||||
mocks.persist.EXPECT().FetchTimedOutWorkers(mocks.ctx, gomock.Any()).AnyTimes().Return(nil, nil)
|
||||
|
||||
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()).
|
||||
Return([]*persistence.Task{&taskUnassigned, &taskUnknownWorker, &taskAssigned}, nil)
|
||||
|
||||
|
@ -20,6 +20,7 @@ const timeoutInitialSleep = 5 * time.Minute
|
||||
// TimeoutChecker periodically times out tasks and workers if the worker hasn't sent any update recently.
|
||||
type TimeoutChecker struct {
|
||||
taskTimeout time.Duration
|
||||
workerTimeout time.Duration
|
||||
|
||||
clock clock.Clock
|
||||
persist PersistenceService
|
||||
@ -29,7 +30,8 @@ type TimeoutChecker struct {
|
||||
|
||||
// New creates a new TimeoutChecker.
|
||||
func New(
|
||||
taskTimeout time.Duration,
|
||||
taskTimeout,
|
||||
workerTimeout time.Duration,
|
||||
clock clock.Clock,
|
||||
persist PersistenceService,
|
||||
taskStateMachine TaskStateMachine,
|
||||
@ -37,6 +39,7 @@ func New(
|
||||
) *TimeoutChecker {
|
||||
return &TimeoutChecker{
|
||||
taskTimeout: taskTimeout,
|
||||
workerTimeout: workerTimeout,
|
||||
|
||||
clock: clock,
|
||||
persist: persist,
|
||||
@ -72,34 +75,6 @@ func (ttc *TimeoutChecker) Run(ctx context.Context) {
|
||||
waitDur = timeoutCheckInterval
|
||||
}
|
||||
ttc.checkTasks(ctx)
|
||||
// ttc.checkWorkers(ctx)
|
||||
ttc.checkWorkers(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// func (ttc *TimeoutChecker) checkWorkers(db *mgo.Database) {
|
||||
// timeoutThreshold := UtcNow().Add(-ttc.config.ActiveWorkerTimeoutInterval)
|
||||
// log.Debugf("Failing all awake workers that have not been seen since %s", timeoutThreshold)
|
||||
|
||||
// var timedoutWorkers []Worker
|
||||
// // find all awake workers that either have never been seen, or were seen long ago.
|
||||
// query := M{
|
||||
// "status": workerStatusAwake,
|
||||
// "$or": []M{
|
||||
// M{"last_activity": M{"$lte": timeoutThreshold}},
|
||||
// M{"last_activity": M{"$exists": false}},
|
||||
// },
|
||||
// }
|
||||
// projection := M{
|
||||
// "_id": 1,
|
||||
// "nickname": 1,
|
||||
// "address": 1,
|
||||
// "status": 1,
|
||||
// }
|
||||
// if err := db.C("flamenco_workers").Find(query).Select(projection).All(&timedoutWorkers); err != nil {
|
||||
// log.Warningf("Error finding timed-out workers: %s", err)
|
||||
// }
|
||||
|
||||
// for _, worker := range timedoutWorkers {
|
||||
// worker.Timeout(db, ttc.scheduler)
|
||||
// }
|
||||
// }
|
||||
|
@ -64,6 +64,7 @@ func timeoutCheckerTestFixtures(t *testing.T) (*TimeoutChecker, func(), *Timeout
|
||||
|
||||
sm := New(
|
||||
taskTimeout,
|
||||
workerTimeout,
|
||||
mocks.clock,
|
||||
mocks.persist,
|
||||
mocks.taskStateMachine,
|
||||
|
58
internal/manager/timeout_checker/workers.go
Normal file
58
internal/manager/timeout_checker/workers.go
Normal file
@ -0,0 +1,58 @@
|
||||
package timeout_checker
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.blender.org/flamenco/internal/manager/persistence"
|
||||
"git.blender.org/flamenco/pkg/api"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func (ttc *TimeoutChecker) checkWorkers(ctx context.Context) {
|
||||
timeoutThreshold := ttc.clock.Now().UTC().Add(-ttc.workerTimeout)
|
||||
logger := log.With().
|
||||
Time("threshold", timeoutThreshold.Local()).
|
||||
Logger()
|
||||
logger.Trace().Msg("TimeoutChecker: finding all awake workers that have not been seen since threshold")
|
||||
|
||||
workers, err := ttc.persist.FetchTimedOutWorkers(ctx, timeoutThreshold)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("TimeoutChecker: error fetching timed-out workers from database")
|
||||
return
|
||||
}
|
||||
|
||||
if len(workers) == 0 {
|
||||
logger.Trace().Msg("TimeoutChecker: no timed-out workers")
|
||||
return
|
||||
}
|
||||
logger.Debug().
|
||||
Int("numWorkers", len(workers)).
|
||||
Msg("TimeoutChecker: failing all awake workers that have not been seen since threshold")
|
||||
|
||||
for _, worker := range workers {
|
||||
ttc.timeoutWorker(ctx, worker)
|
||||
}
|
||||
}
|
||||
|
||||
// timeoutTask marks a task as 'failed' due to a timeout.
|
||||
func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistence.Worker) {
|
||||
logger := log.With().
|
||||
Str("worker", worker.UUID).
|
||||
Str("name", worker.Name).
|
||||
Str("lastSeenAt", worker.LastSeenAt.String()).
|
||||
Logger()
|
||||
logger.Warn().Msg("TimeoutChecker: worker timed out")
|
||||
|
||||
worker.Status = api.WorkerStatusError
|
||||
worker.StatusChangeClear()
|
||||
|
||||
err := ttc.persist.SaveWorker(ctx, worker)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out worker to database")
|
||||
}
|
||||
|
||||
// Re-queue all tasks assigned to this worker.
|
||||
|
||||
}
|
53
internal/manager/timeout_checker/workers_test.go
Normal file
53
internal/manager/timeout_checker/workers_test.go
Normal file
@ -0,0 +1,53 @@
|
||||
package timeout_checker
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.blender.org/flamenco/internal/manager/persistence"
|
||||
"git.blender.org/flamenco/pkg/api"
|
||||
"github.com/golang/mock/gomock"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
const workerTimeout = 20 * time.Minute
|
||||
|
||||
func TestWorkerTimeout(t *testing.T) {
|
||||
ttc, finish, mocks := timeoutCheckerTestFixtures(t)
|
||||
defer finish()
|
||||
|
||||
mocks.run(ttc)
|
||||
|
||||
// Wait for the timeout checker to actually be sleeping, otherwise it could
|
||||
// have a different sleep-start time than we expect.
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
|
||||
lastSeenAt := mocks.clock.Now().UTC().Add(-1 * time.Hour)
|
||||
|
||||
worker := persistence.Worker{
|
||||
UUID: "WORKER-UUID",
|
||||
Name: "Tester",
|
||||
Model: gorm.Model{ID: 47},
|
||||
LastSeenAt: lastSeenAt,
|
||||
Status: api.WorkerStatusAsleep,
|
||||
StatusRequested: api.WorkerStatusAwake,
|
||||
}
|
||||
|
||||
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()).Return([]*persistence.Task{}, nil)
|
||||
mocks.persist.EXPECT().FetchTimedOutWorkers(mocks.ctx, gomock.Any()).
|
||||
Return([]*persistence.Worker{&worker}, nil)
|
||||
|
||||
persistedWorker := worker
|
||||
persistedWorker.Status = api.WorkerStatusError
|
||||
// Any queued up status change should be cleared, as the Worker is not allowed
|
||||
// to change into anything until this timeout has been address.
|
||||
persistedWorker.StatusChangeClear()
|
||||
mocks.persist.EXPECT().SaveWorker(mocks.ctx, &persistedWorker).Return(nil)
|
||||
|
||||
// TODO: expect all tasks assigned to the worker to get requeued.
|
||||
|
||||
// All the timeouts should be handled after the initial sleep.
|
||||
mocks.clock.Add(timeoutInitialSleep)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user