Manager: Task Timeout Checker

Tasks that are in state `active` but haven't been 'touched' by a Worker
for 10 minutes or longer will transition to state `failed`.

In the future, it might be better to move the decision about which state
is suitable to the Task State Machine service, so that it can be smarter
and take the history of the task into account. Going to `soft-failed`
first might be a nice touch.
This commit is contained in:
Sybren A. Stüvel 2022-06-09 10:58:12 +02:00
parent 295891a17a
commit d90a8b987d
10 changed files with 636 additions and 7 deletions

View File

@ -35,6 +35,7 @@ import (
"git.blender.org/flamenco/internal/manager/swagger_ui" "git.blender.org/flamenco/internal/manager/swagger_ui"
"git.blender.org/flamenco/internal/manager/task_logs" "git.blender.org/flamenco/internal/manager/task_logs"
"git.blender.org/flamenco/internal/manager/task_state_machine" "git.blender.org/flamenco/internal/manager/task_state_machine"
"git.blender.org/flamenco/internal/manager/timeout_checker"
"git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/internal/manager/webupdates"
"git.blender.org/flamenco/internal/own_url" "git.blender.org/flamenco/internal/own_url"
"git.blender.org/flamenco/internal/upnp_ssdp" "git.blender.org/flamenco/internal/upnp_ssdp"
@ -106,11 +107,16 @@ func main() {
// //
// go persist.PeriodicMaintenanceLoop(mainCtx) // go persist.PeriodicMaintenanceLoop(mainCtx)
timeService := clock.New()
webUpdater := webupdates.New() webUpdater := webupdates.New()
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater) taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater)
flamenco := buildFlamencoAPI(configService, persist, taskStateMachine, webUpdater) logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater)
flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater)
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls) e := buildWebService(flamenco, persist, ssdp, webUpdater, urls)
timeoutChecker := timeout_checker.New(
configService.Get().TaskTimeout, timeService, persist, taskStateMachine, logStorage)
installSignalHandler(mainCtxCancel) installSignalHandler(mainCtxCancel)
// Before doing anything new, clean up in case we made a mess in an earlier run. // Before doing anything new, clean up in case we made a mess in an earlier run.
@ -144,22 +150,29 @@ func main() {
}() }()
} }
// Start the timeout checker.
wg.Add(1)
go func() {
defer wg.Done()
timeoutChecker.Run(mainCtx)
}()
wg.Wait() wg.Wait()
log.Info().Msg("shutdown complete") log.Info().Msg("shutdown complete")
} }
func buildFlamencoAPI( func buildFlamencoAPI(
timeService clock.Clock,
configService *config.Service, configService *config.Service,
persist *persistence.DB, persist *persistence.DB,
taskStateMachine *task_state_machine.StateMachine, taskStateMachine *task_state_machine.StateMachine,
logStorage *task_logs.Storage,
webUpdater *webupdates.BiDirComms, webUpdater *webupdates.BiDirComms,
) api.ServerInterface { ) api.ServerInterface {
timeService := clock.New()
compiler, err := job_compilers.Load(timeService) compiler, err := job_compilers.Load(timeService)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("error loading job compilers") log.Fatal().Err(err).Msg("error loading job compilers")
} }
logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater)
shamanServer := shaman.NewServer(configService.Get().Shaman, nil) shamanServer := shaman.NewServer(configService.Get().Shaman, nil)
flamenco := api_impl.NewFlamenco( flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService, compiler, persist, webUpdater, logStorage, configService,

View File

@ -84,7 +84,7 @@ type Base struct {
// TLSCert string `yaml:"tlscert"` // TLSCert string `yaml:"tlscert"`
// ACMEDomainName string `yaml:"acme_domain_name"` // for the ACME Let's Encrypt client // ACMEDomainName string `yaml:"acme_domain_name"` // for the ACME Let's Encrypt client
// ActiveTaskTimeoutInterval time.Duration `yaml:"active_task_timeout_interval"` TaskTimeout time.Duration `yaml:"task_timeout"`
// ActiveWorkerTimeoutInterval time.Duration `yaml:"active_worker_timeout_interval"` // ActiveWorkerTimeoutInterval time.Duration `yaml:"active_worker_timeout_interval"`
// WorkerCleanupMaxAge time.Duration `yaml:"worker_cleanup_max_age"` // WorkerCleanupMaxAge time.Duration `yaml:"worker_cleanup_max_age"`

View File

@ -32,7 +32,7 @@ var defaultConfig = Conf{
}, },
}, },
// ActiveTaskTimeoutInterval: 10 * time.Minute, TaskTimeout: 10 * time.Minute,
// ActiveWorkerTimeoutInterval: 1 * time.Minute, // ActiveWorkerTimeoutInterval: 1 * time.Minute,
// // Days are assumed to be 24 hours long. This is not exactly accurate, but should // // Days are assumed to be 24 hours long. This is not exactly accurate, but should

View File

@ -46,7 +46,7 @@ type Task struct {
// Which worker is/was working on this. // Which worker is/was working on this.
WorkerID *uint WorkerID *uint
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"`
LastTouchedAt time.Time LastTouchedAt time.Time // Should contain UTC timestamps.
// Dependencies are tasks that need to be completed before this one can run. // Dependencies are tasks that need to be completed before this one can run.
Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"` Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"`

View File

@ -0,0 +1,26 @@
package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"time"
"git.blender.org/flamenco/pkg/api"
)
// This file contains functions for dealing with task/worker timeouts. Not database timeouts.
func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*Task, error) {
result := []*Task{}
tx := db.gormDB.WithContext(ctx).
Model(&Task{}).
Joins("Job").
Where("tasks.status = ?", api.TaskStatusActive).
Where("tasks.last_touched_at <= ?", untouchedSince).
Scan(&result)
if tx.Error != nil {
return nil, taskError(tx.Error, "finding timed out tasks (untouched since %s)", untouchedSince.String())
}
return result, nil
}

View File

@ -0,0 +1,51 @@
package persistence
import (
"testing"
"time"
"git.blender.org/flamenco/pkg/api"
"github.com/stretchr/testify/assert"
)
// SPDX-License-Identifier: GPL-3.0-or-later
func TestFetchTimedOutTasks(t *testing.T) {
ctx, close, db, job, _ := jobTasksTestFixtures(t)
defer close()
tasks, err := db.FetchTasksOfJob(ctx, job)
if !assert.NoError(t, err) {
t.FailNow()
}
now := db.gormDB.NowFunc()
deadline := now.Add(-5 * time.Minute)
// Mark the task as last touched before the deadline, i.e. old enough for a timeout.
task := tasks[0]
task.LastTouchedAt = deadline.Add(-1 * time.Minute)
assert.NoError(t, db.SaveTask(ctx, task))
w := createWorker(ctx, t, db)
assert.NoError(t, db.TaskAssignToWorker(ctx, task, w))
// The task should still not be returned, as it's not in 'active' state.
timedout, err := db.FetchTimedOutTasks(ctx, deadline)
assert.NoError(t, err)
assert.Empty(t, timedout)
// Mark as Active:
task.Status = api.TaskStatusActive
assert.NoError(t, db.SaveTask(ctx, task))
// Now it should time out:
timedout, err = db.FetchTimedOutTasks(ctx, deadline)
assert.NoError(t, err)
if assert.Len(t, timedout, 1) {
// Other fields will be different, like the 'UpdatedAt' field -- this just
// tests that the expected task is returned.
assert.Equal(t, task.UUID, timedout[0].UUID)
assert.Equal(t, job, task.Job, "the job should be included in the result as well")
}
}

View File

@ -0,0 +1,34 @@
package timeout_checker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"time"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/task_state_machine"
"git.blender.org/flamenco/pkg/api"
"github.com/rs/zerolog"
)
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/timeout_checker PersistenceService,TaskStateMachine,LogStorage
type PersistenceService interface {
FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error)
}
var _ PersistenceService = (*persistence.DB)(nil)
type TaskStateMachine interface {
// TaskStatusChange gives a Task a new status, and handles the resulting status changes on the job.
TaskStatusChange(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error
}
var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil)
// LogStorage is used to append timeout messages to task logs.
type LogStorage interface {
WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error
}

View File

@ -0,0 +1,128 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/timeout_checker (interfaces: PersistenceService,TaskStateMachine,LogStorage)
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
time "time"
persistence "git.blender.org/flamenco/internal/manager/persistence"
api "git.blender.org/flamenco/pkg/api"
gomock "github.com/golang/mock/gomock"
zerolog "github.com/rs/zerolog"
)
// MockPersistenceService is a mock of PersistenceService interface.
type MockPersistenceService struct {
ctrl *gomock.Controller
recorder *MockPersistenceServiceMockRecorder
}
// MockPersistenceServiceMockRecorder is the mock recorder for MockPersistenceService.
type MockPersistenceServiceMockRecorder struct {
mock *MockPersistenceService
}
// NewMockPersistenceService creates a new mock instance.
func NewMockPersistenceService(ctrl *gomock.Controller) *MockPersistenceService {
mock := &MockPersistenceService{ctrl: ctrl}
mock.recorder = &MockPersistenceServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder {
return m.recorder
}
// FetchTimedOutTasks mocks base method.
func (m *MockPersistenceService) FetchTimedOutTasks(arg0 context.Context, arg1 time.Time) ([]*persistence.Task, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchTimedOutTasks", arg0, arg1)
ret0, _ := ret[0].([]*persistence.Task)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchTimedOutTasks indicates an expected call of FetchTimedOutTasks.
func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutTasks(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTimedOutTasks", reflect.TypeOf((*MockPersistenceService)(nil).FetchTimedOutTasks), arg0, arg1)
}
// MockTaskStateMachine is a mock of TaskStateMachine interface.
type MockTaskStateMachine struct {
ctrl *gomock.Controller
recorder *MockTaskStateMachineMockRecorder
}
// MockTaskStateMachineMockRecorder is the mock recorder for MockTaskStateMachine.
type MockTaskStateMachineMockRecorder struct {
mock *MockTaskStateMachine
}
// NewMockTaskStateMachine creates a new mock instance.
func NewMockTaskStateMachine(ctrl *gomock.Controller) *MockTaskStateMachine {
mock := &MockTaskStateMachine{ctrl: ctrl}
mock.recorder = &MockTaskStateMachineMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder {
return m.recorder
}
// TaskStatusChange mocks base method.
func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TaskStatusChange", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// TaskStatusChange indicates an expected call of TaskStatusChange.
func (mr *MockTaskStateMachineMockRecorder) TaskStatusChange(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).TaskStatusChange), arg0, arg1, arg2)
}
// MockLogStorage is a mock of LogStorage interface.
type MockLogStorage struct {
ctrl *gomock.Controller
recorder *MockLogStorageMockRecorder
}
// MockLogStorageMockRecorder is the mock recorder for MockLogStorage.
type MockLogStorageMockRecorder struct {
mock *MockLogStorage
}
// NewMockLogStorage creates a new mock instance.
func NewMockLogStorage(ctrl *gomock.Controller) *MockLogStorage {
mock := &MockLogStorage{ctrl: ctrl}
mock.recorder = &MockLogStorageMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLogStorage) EXPECT() *MockLogStorageMockRecorder {
return m.recorder
}
// WriteTimestamped mocks base method.
func (m *MockLogStorage) WriteTimestamped(arg0 zerolog.Logger, arg1, arg2, arg3 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WriteTimestamped", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(error)
return ret0
}
// WriteTimestamped indicates an expected call of WriteTimestamped.
func (mr *MockLogStorageMockRecorder) WriteTimestamped(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTimestamped", reflect.TypeOf((*MockLogStorage)(nil).WriteTimestamped), arg0, arg1, arg2, arg3)
}

View File

@ -0,0 +1,181 @@
package timeout_checker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"fmt"
"time"
"github.com/benbjohnson/clock"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
// Interval for checking all active tasks for timeouts.
const timeoutCheckInterval = 1 * time.Minute
// Delay for the intial check. This gives workers a chance to reconnect to the Manager
// and send updates after the Manager has started.
const timeoutInitialSleep = 5 * time.Minute
// TimeoutChecker periodically times out tasks and workers if the worker hasn't sent any update recently.
type TimeoutChecker struct {
taskTimeout time.Duration
clock clock.Clock
persist PersistenceService
taskStateMachine TaskStateMachine
logStorage LogStorage
}
// New creates a new TimeoutChecker.
func New(
taskTimeout time.Duration,
clock clock.Clock,
persist PersistenceService,
taskStateMachine TaskStateMachine,
logStorage LogStorage,
) *TimeoutChecker {
return &TimeoutChecker{
taskTimeout: taskTimeout,
clock: clock,
persist: persist,
taskStateMachine: taskStateMachine,
logStorage: logStorage,
}
}
// Run runs the timeout checker until the context closes.
func (ttc *TimeoutChecker) Run(ctx context.Context) {
defer log.Info().Msg("TimeoutChecker: shutting down")
if ttc.taskTimeout == 0 {
log.Warn().Msg("TimeoutChecker: no timeout duration configured, will not check for task timeouts")
return
}
log.Info().
Str("taskTimeout", ttc.taskTimeout.String()).
Str("initialSleep", timeoutInitialSleep.String()).
Str("checkInterval", timeoutCheckInterval.String()).
Msg("TimeoutChecker: starting up")
// Start with a delay, so that workers get a chance to push their updates
// after the manager has started up.
waitDur := timeoutInitialSleep
for {
select {
case <-ctx.Done():
return
case <-ttc.clock.After(waitDur):
waitDur = timeoutCheckInterval
}
ttc.checkTasks(ctx)
// ttc.checkWorkers(ctx)
}
}
func (ttc *TimeoutChecker) checkTasks(ctx context.Context) {
timeoutThreshold := ttc.clock.Now().UTC().Add(-ttc.taskTimeout)
logger := log.With().
Time("threshold", timeoutThreshold.Local()).
Logger()
logger.Debug().Msg("TimeoutChecker: finding active tasks that have not been touched since threshold")
tasks, err := ttc.persist.FetchTimedOutTasks(ctx, timeoutThreshold)
if err != nil {
log.Error().Err(err).Msg("TimeoutChecker: error fetching timed-out tasks from database")
return
}
if len(tasks) == 0 {
logger.Trace().Msg("TimeoutChecker: no timed-out tasks")
return
}
logger.Debug().
Int("numTasks", len(tasks)).
Msg("TimeoutChecker: failing all active tasks that have not been touched since threshold")
for _, task := range tasks {
ttc.timeoutTask(ctx, task)
}
}
// timeoutTask marks a task as 'failed' due to a timeout.
func (ttc *TimeoutChecker) timeoutTask(ctx context.Context, task *persistence.Task) {
workerIdent, logger := ttc.assignedWorker(task)
task.Activity = fmt.Sprintf("Task timed out on worker %s", workerIdent)
err := ttc.taskStateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed)
if err != nil {
logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out task to database")
}
err = ttc.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID,
fmt.Sprintf("Task timed out. It was assigned to worker %s, but untouched since %s",
workerIdent, task.LastTouchedAt.Format(time.RFC3339)))
if err != nil {
logger.Error().Err(err).Msg("TimeoutChecker: error writing timeout info to the task log")
}
}
// assignedWorker returns a description of the worker assigned to this task,
// and a logger configured for it.
func (ttc *TimeoutChecker) assignedWorker(task *persistence.Task) (string, zerolog.Logger) {
logCtx := log.With().Str("task", task.UUID)
if task.WorkerID == nil {
logger := logCtx.Logger()
logger.Warn().Msg("TimeoutChecker: task timed out, but was not assigned to any worker")
return "-unassigned-", logger
}
if task.Worker == nil {
logger := logCtx.Logger()
logger.Warn().Uint("workerDBID", *task.WorkerID).
Msg("TimeoutChecker: task is assigned to worker that no longer exists")
return "-unknown-", logger
}
logCtx = logCtx.
Str("worker", task.Worker.UUID).
Str("workerName", task.Worker.Name)
logger := logCtx.Logger()
logger.Warn().Msg("TimeoutChecker: task timed out")
return task.Worker.Identifier(), logger
}
// func (ttc *TimeoutChecker) checkWorkers(db *mgo.Database) {
// timeoutThreshold := UtcNow().Add(-ttc.config.ActiveWorkerTimeoutInterval)
// log.Debugf("Failing all awake workers that have not been seen since %s", timeoutThreshold)
// var timedoutWorkers []Worker
// // find all awake workers that either have never been seen, or were seen long ago.
// query := M{
// "status": workerStatusAwake,
// "$or": []M{
// M{"last_activity": M{"$lte": timeoutThreshold}},
// M{"last_activity": M{"$exists": false}},
// },
// }
// projection := M{
// "_id": 1,
// "nickname": 1,
// "address": 1,
// "status": 1,
// }
// if err := db.C("flamenco_workers").Find(query).Select(projection).All(&timedoutWorkers); err != nil {
// log.Warningf("Error finding timed-out workers: %s", err)
// }
// for _, worker := range timedoutWorkers {
// worker.Timeout(db, ttc.scheduler)
// }
// }

View File

@ -0,0 +1,196 @@
package timeout_checker
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/timeout_checker/mocks"
"git.blender.org/flamenco/pkg/api"
)
const taskTimeout = 20 * time.Minute
func TestTimeoutCheckerTiming(t *testing.T) {
ttc, finish, mocks := timeoutCheckerTestFixtures(t)
defer finish()
mocks.run(ttc)
// Wait for the timeout checker to actually be sleeping, otherwise it could
// have a different sleep-start time than we expect.
time.Sleep(1 * time.Millisecond)
// Determine the deadlines relative to the initial clock value.
initialTime := mocks.clock.Now().UTC()
deadlines := []time.Time{
initialTime.Add(timeoutInitialSleep - taskTimeout),
initialTime.Add(timeoutInitialSleep - taskTimeout + 1*timeoutCheckInterval),
initialTime.Add(timeoutInitialSleep - taskTimeout + 2*timeoutCheckInterval),
}
// Expect three fetches, one after the initial sleep time, and two a regular interval later.
fetchTimes := make([]time.Time, len(deadlines))
firstCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[0]).
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
fetchTimes[0] = mocks.clock.Now().UTC()
return []*persistence.Task{}, nil
})
secondCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[1]).
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
fetchTimes[1] = mocks.clock.Now().UTC()
// Return a database error. This shouldn't break the check loop.
return []*persistence.Task{}, errors.New("testing what errors do")
}).
After(firstCall)
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[2]).
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
fetchTimes[2] = mocks.clock.Now().UTC()
return []*persistence.Task{}, nil
}).
After(secondCall)
mocks.clock.Add(2 * time.Minute) // Should still be sleeping.
mocks.clock.Add(2 * time.Minute) // Should still be sleeping.
mocks.clock.Add(time.Minute) // Should trigger the first fetch.
mocks.clock.Add(time.Minute) // Should trigger the second fetch.
mocks.clock.Add(time.Minute) // Should trigger the third fetch.
// Wait for the timeout checker to actually run & hit the expected calls.
time.Sleep(1 * time.Millisecond)
for idx, fetchTime := range fetchTimes {
// Check for zero values first, because they can be a bit confusing in the assert.Equal() logs.
if !assert.Falsef(t, fetchTime.IsZero(), "fetchTime[%d] should not be zero", idx) {
continue
}
expect := initialTime.Add(timeoutInitialSleep + time.Duration(idx)*timeoutCheckInterval)
assert.Equalf(t, expect, fetchTime, "fetchTime[%d] not as expected", idx)
}
}
func TestTaskTimeout(t *testing.T) {
ttc, finish, mocks := timeoutCheckerTestFixtures(t)
defer finish()
mocks.run(ttc)
// Wait for the timeout checker to actually be sleeping, otherwise it could
// have a different sleep-start time than we expect.
time.Sleep(1 * time.Millisecond)
lastTime := mocks.clock.Now().UTC().Add(-1 * time.Hour)
job := persistence.Job{UUID: "JOB-UUID"}
worker := persistence.Worker{
UUID: "WORKER-UUID",
Name: "Tester",
Model: gorm.Model{ID: 47},
}
taskUnassigned := persistence.Task{
UUID: "TASK-UUID-UNASSIGNED",
Job: &job,
LastTouchedAt: lastTime,
}
taskUnknownWorker := persistence.Task{
UUID: "TASK-UUID-UNKNOWN",
Job: &job,
LastTouchedAt: lastTime,
WorkerID: &worker.ID,
}
taskAssigned := persistence.Task{
UUID: "TASK-UUID-ASSIGNED",
Job: &job,
LastTouchedAt: lastTime,
WorkerID: &worker.ID,
Worker: &worker,
}
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()).
Return([]*persistence.Task{&taskUnassigned, &taskUnknownWorker, &taskAssigned}, nil)
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnassigned, api.TaskStatusFailed)
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnknownWorker, api.TaskStatusFailed)
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskAssigned, api.TaskStatusFailed)
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnassigned.UUID,
"Task timed out. It was assigned to worker -unassigned-, but untouched since 1969-12-31T23:00:00Z")
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnknownWorker.UUID,
"Task timed out. It was assigned to worker -unknown-, but untouched since 1969-12-31T23:00:00Z")
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskAssigned.UUID,
"Task timed out. It was assigned to worker Tester (WORKER-UUID), but untouched since 1969-12-31T23:00:00Z")
// All the timeouts should be handled after the initial sleep.
mocks.clock.Add(timeoutInitialSleep)
}
type TimeoutCheckerMocks struct {
clock *clock.Mock
persist *mocks.MockPersistenceService
taskStateMachine *mocks.MockTaskStateMachine
logStorage *mocks.MockLogStorage
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
}
// run starts a goroutine to call ttc.Run(mocks.ctx).
func (mocks *TimeoutCheckerMocks) run(ttc *TimeoutChecker) {
mocks.wg.Add(1)
go func() {
defer mocks.wg.Done()
ttc.Run(mocks.ctx)
}()
}
func timeoutCheckerTestFixtures(t *testing.T) (*TimeoutChecker, func(), *TimeoutCheckerMocks) {
mockCtrl := gomock.NewController(t)
mocks := &TimeoutCheckerMocks{
clock: clock.NewMock(),
persist: mocks.NewMockPersistenceService(mockCtrl),
taskStateMachine: mocks.NewMockTaskStateMachine(mockCtrl),
logStorage: mocks.NewMockLogStorage(mockCtrl),
wg: new(sync.WaitGroup),
}
// mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T16:52:04+02:00")
// if err != nil {
// panic(err)
// }
// mocks.clock.Set(mockedNow)
ctx, cancel := context.WithCancel(context.Background())
mocks.ctx = ctx
mocks.cancel = cancel
// This should be called at the end of each unit test.
finish := func() {
mocks.cancel()
mocks.wg.Wait()
mockCtrl.Finish()
}
sm := New(
taskTimeout,
mocks.clock,
mocks.persist,
mocks.taskStateMachine,
mocks.logStorage,
)
return sm, finish, mocks
}