Cleanup: timeout checker, move task-specific code to tasks.go
Just a cleanup to prepare for the addition of worker timeouts.
This commit is contained in:
parent
13307c5a24
commit
fe1627dd85
86
internal/manager/timeout_checker/tasks.go
Normal file
86
internal/manager/timeout_checker/tasks.go
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
package timeout_checker
|
||||||
|
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
|
||||||
|
"git.blender.org/flamenco/internal/manager/persistence"
|
||||||
|
"git.blender.org/flamenco/pkg/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (ttc *TimeoutChecker) checkTasks(ctx context.Context) {
|
||||||
|
timeoutThreshold := ttc.clock.Now().UTC().Add(-ttc.taskTimeout)
|
||||||
|
logger := log.With().
|
||||||
|
Time("threshold", timeoutThreshold.Local()).
|
||||||
|
Logger()
|
||||||
|
logger.Debug().Msg("TimeoutChecker: finding active tasks that have not been touched since threshold")
|
||||||
|
|
||||||
|
tasks, err := ttc.persist.FetchTimedOutTasks(ctx, timeoutThreshold)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("TimeoutChecker: error fetching timed-out tasks from database")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
logger.Trace().Msg("TimeoutChecker: no timed-out tasks")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Debug().
|
||||||
|
Int("numTasks", len(tasks)).
|
||||||
|
Msg("TimeoutChecker: failing all active tasks that have not been touched since threshold")
|
||||||
|
|
||||||
|
for _, task := range tasks {
|
||||||
|
ttc.timeoutTask(ctx, task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// timeoutTask marks a task as 'failed' due to a timeout.
|
||||||
|
func (ttc *TimeoutChecker) timeoutTask(ctx context.Context, task *persistence.Task) {
|
||||||
|
workerIdent, logger := ttc.assignedWorker(task)
|
||||||
|
|
||||||
|
task.Activity = fmt.Sprintf("Task timed out on worker %s", workerIdent)
|
||||||
|
err := ttc.taskStateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out task to database")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ttc.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID,
|
||||||
|
fmt.Sprintf("Task timed out. It was assigned to worker %s, but untouched since %s",
|
||||||
|
workerIdent, task.LastTouchedAt.Format(time.RFC3339)))
|
||||||
|
if err != nil {
|
||||||
|
logger.Error().Err(err).Msg("TimeoutChecker: error writing timeout info to the task log")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// assignedWorker returns a description of the worker assigned to this task,
|
||||||
|
// and a logger configured for it.
|
||||||
|
func (ttc *TimeoutChecker) assignedWorker(task *persistence.Task) (string, zerolog.Logger) {
|
||||||
|
logCtx := log.With().Str("task", task.UUID)
|
||||||
|
|
||||||
|
if task.WorkerID == nil {
|
||||||
|
logger := logCtx.Logger()
|
||||||
|
logger.Warn().Msg("TimeoutChecker: task timed out, but was not assigned to any worker")
|
||||||
|
return "-unassigned-", logger
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.Worker == nil {
|
||||||
|
logger := logCtx.Logger()
|
||||||
|
logger.Warn().Uint("workerDBID", *task.WorkerID).
|
||||||
|
Msg("TimeoutChecker: task is assigned to worker that no longer exists")
|
||||||
|
return "-unknown-", logger
|
||||||
|
}
|
||||||
|
|
||||||
|
logCtx = logCtx.
|
||||||
|
Str("worker", task.Worker.UUID).
|
||||||
|
Str("workerName", task.Worker.Name)
|
||||||
|
logger := logCtx.Logger()
|
||||||
|
logger.Warn().Msg("TimeoutChecker: task timed out")
|
||||||
|
|
||||||
|
return task.Worker.Identifier(), logger
|
||||||
|
}
|
141
internal/manager/timeout_checker/tasks_test.go
Normal file
141
internal/manager/timeout_checker/tasks_test.go
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
package timeout_checker
|
||||||
|
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/mock/gomock"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
|
||||||
|
"git.blender.org/flamenco/internal/manager/persistence"
|
||||||
|
"git.blender.org/flamenco/pkg/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
const taskTimeout = 20 * time.Minute
|
||||||
|
|
||||||
|
func TestTimeoutCheckerTiming(t *testing.T) {
|
||||||
|
ttc, finish, mocks := timeoutCheckerTestFixtures(t)
|
||||||
|
defer finish()
|
||||||
|
|
||||||
|
mocks.run(ttc)
|
||||||
|
|
||||||
|
// Wait for the timeout checker to actually be sleeping, otherwise it could
|
||||||
|
// have a different sleep-start time than we expect.
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
|
||||||
|
// Determine the deadlines relative to the initial clock value.
|
||||||
|
initialTime := mocks.clock.Now().UTC()
|
||||||
|
deadlines := []time.Time{
|
||||||
|
initialTime.Add(timeoutInitialSleep - taskTimeout),
|
||||||
|
initialTime.Add(timeoutInitialSleep - taskTimeout + 1*timeoutCheckInterval),
|
||||||
|
initialTime.Add(timeoutInitialSleep - taskTimeout + 2*timeoutCheckInterval),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expect three fetches, one after the initial sleep time, and two a regular interval later.
|
||||||
|
fetchTimes := make([]time.Time, len(deadlines))
|
||||||
|
firstCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[0]).
|
||||||
|
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
|
||||||
|
fetchTimes[0] = mocks.clock.Now().UTC()
|
||||||
|
return []*persistence.Task{}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
secondCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[1]).
|
||||||
|
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
|
||||||
|
fetchTimes[1] = mocks.clock.Now().UTC()
|
||||||
|
// Return a database error. This shouldn't break the check loop.
|
||||||
|
return []*persistence.Task{}, errors.New("testing what errors do")
|
||||||
|
}).
|
||||||
|
After(firstCall)
|
||||||
|
|
||||||
|
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[2]).
|
||||||
|
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
|
||||||
|
fetchTimes[2] = mocks.clock.Now().UTC()
|
||||||
|
return []*persistence.Task{}, nil
|
||||||
|
}).
|
||||||
|
After(secondCall)
|
||||||
|
|
||||||
|
mocks.clock.Add(2 * time.Minute) // Should still be sleeping.
|
||||||
|
mocks.clock.Add(2 * time.Minute) // Should still be sleeping.
|
||||||
|
mocks.clock.Add(time.Minute) // Should trigger the first fetch.
|
||||||
|
mocks.clock.Add(time.Minute) // Should trigger the second fetch.
|
||||||
|
mocks.clock.Add(time.Minute) // Should trigger the third fetch.
|
||||||
|
|
||||||
|
// Wait for the timeout checker to actually run & hit the expected calls.
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
|
||||||
|
for idx, fetchTime := range fetchTimes {
|
||||||
|
// Check for zero values first, because they can be a bit confusing in the assert.Equal() logs.
|
||||||
|
if !assert.Falsef(t, fetchTime.IsZero(), "fetchTime[%d] should not be zero", idx) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
expect := initialTime.Add(timeoutInitialSleep + time.Duration(idx)*timeoutCheckInterval)
|
||||||
|
assert.Equalf(t, expect, fetchTime, "fetchTime[%d] not as expected", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTaskTimeout(t *testing.T) {
|
||||||
|
// Canary test: if these constants do not have the expected value, the test
|
||||||
|
// will fail rather cryptically.
|
||||||
|
if !assert.Equal(t, 5*time.Minute, timeoutInitialSleep, "timeoutInitialSleep does not have the expected value") ||
|
||||||
|
!assert.Equal(t, 1*time.Minute, timeoutCheckInterval, "timeoutCheckInterval does not have the expected value") {
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
|
||||||
|
ttc, finish, mocks := timeoutCheckerTestFixtures(t)
|
||||||
|
defer finish()
|
||||||
|
|
||||||
|
mocks.run(ttc)
|
||||||
|
|
||||||
|
// Wait for the timeout checker to actually be sleeping, otherwise it could
|
||||||
|
// have a different sleep-start time than we expect.
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
|
||||||
|
lastTime := mocks.clock.Now().UTC().Add(-1 * time.Hour)
|
||||||
|
|
||||||
|
job := persistence.Job{UUID: "JOB-UUID"}
|
||||||
|
worker := persistence.Worker{
|
||||||
|
UUID: "WORKER-UUID",
|
||||||
|
Name: "Tester",
|
||||||
|
Model: gorm.Model{ID: 47},
|
||||||
|
}
|
||||||
|
taskUnassigned := persistence.Task{
|
||||||
|
UUID: "TASK-UUID-UNASSIGNED",
|
||||||
|
Job: &job,
|
||||||
|
LastTouchedAt: lastTime,
|
||||||
|
}
|
||||||
|
taskUnknownWorker := persistence.Task{
|
||||||
|
UUID: "TASK-UUID-UNKNOWN",
|
||||||
|
Job: &job,
|
||||||
|
LastTouchedAt: lastTime,
|
||||||
|
WorkerID: &worker.ID,
|
||||||
|
}
|
||||||
|
taskAssigned := persistence.Task{
|
||||||
|
UUID: "TASK-UUID-ASSIGNED",
|
||||||
|
Job: &job,
|
||||||
|
LastTouchedAt: lastTime,
|
||||||
|
WorkerID: &worker.ID,
|
||||||
|
Worker: &worker,
|
||||||
|
}
|
||||||
|
|
||||||
|
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()).
|
||||||
|
Return([]*persistence.Task{&taskUnassigned, &taskUnknownWorker, &taskAssigned}, nil)
|
||||||
|
|
||||||
|
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnassigned, api.TaskStatusFailed)
|
||||||
|
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnknownWorker, api.TaskStatusFailed)
|
||||||
|
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskAssigned, api.TaskStatusFailed)
|
||||||
|
|
||||||
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnassigned.UUID,
|
||||||
|
"Task timed out. It was assigned to worker -unassigned-, but untouched since 1969-12-31T23:00:00Z")
|
||||||
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnknownWorker.UUID,
|
||||||
|
"Task timed out. It was assigned to worker -unknown-, but untouched since 1969-12-31T23:00:00Z")
|
||||||
|
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskAssigned.UUID,
|
||||||
|
"Task timed out. It was assigned to worker Tester (WORKER-UUID), but untouched since 1969-12-31T23:00:00Z")
|
||||||
|
|
||||||
|
// All the timeouts should be handled after the initial sleep.
|
||||||
|
mocks.clock.Add(timeoutInitialSleep)
|
||||||
|
}
|
@ -4,15 +4,10 @@ package timeout_checker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/benbjohnson/clock"
|
"github.com/benbjohnson/clock"
|
||||||
"github.com/rs/zerolog"
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"git.blender.org/flamenco/internal/manager/persistence"
|
|
||||||
"git.blender.org/flamenco/pkg/api"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Interval for checking all active tasks for timeouts.
|
// Interval for checking all active tasks for timeouts.
|
||||||
@ -81,77 +76,6 @@ func (ttc *TimeoutChecker) Run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ttc *TimeoutChecker) checkTasks(ctx context.Context) {
|
|
||||||
timeoutThreshold := ttc.clock.Now().UTC().Add(-ttc.taskTimeout)
|
|
||||||
logger := log.With().
|
|
||||||
Time("threshold", timeoutThreshold.Local()).
|
|
||||||
Logger()
|
|
||||||
logger.Debug().Msg("TimeoutChecker: finding active tasks that have not been touched since threshold")
|
|
||||||
|
|
||||||
tasks, err := ttc.persist.FetchTimedOutTasks(ctx, timeoutThreshold)
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msg("TimeoutChecker: error fetching timed-out tasks from database")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(tasks) == 0 {
|
|
||||||
logger.Trace().Msg("TimeoutChecker: no timed-out tasks")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Debug().
|
|
||||||
Int("numTasks", len(tasks)).
|
|
||||||
Msg("TimeoutChecker: failing all active tasks that have not been touched since threshold")
|
|
||||||
|
|
||||||
for _, task := range tasks {
|
|
||||||
ttc.timeoutTask(ctx, task)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// timeoutTask marks a task as 'failed' due to a timeout.
|
|
||||||
func (ttc *TimeoutChecker) timeoutTask(ctx context.Context, task *persistence.Task) {
|
|
||||||
workerIdent, logger := ttc.assignedWorker(task)
|
|
||||||
|
|
||||||
task.Activity = fmt.Sprintf("Task timed out on worker %s", workerIdent)
|
|
||||||
err := ttc.taskStateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out task to database")
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ttc.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID,
|
|
||||||
fmt.Sprintf("Task timed out. It was assigned to worker %s, but untouched since %s",
|
|
||||||
workerIdent, task.LastTouchedAt.Format(time.RFC3339)))
|
|
||||||
if err != nil {
|
|
||||||
logger.Error().Err(err).Msg("TimeoutChecker: error writing timeout info to the task log")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// assignedWorker returns a description of the worker assigned to this task,
|
|
||||||
// and a logger configured for it.
|
|
||||||
func (ttc *TimeoutChecker) assignedWorker(task *persistence.Task) (string, zerolog.Logger) {
|
|
||||||
logCtx := log.With().Str("task", task.UUID)
|
|
||||||
|
|
||||||
if task.WorkerID == nil {
|
|
||||||
logger := logCtx.Logger()
|
|
||||||
logger.Warn().Msg("TimeoutChecker: task timed out, but was not assigned to any worker")
|
|
||||||
return "-unassigned-", logger
|
|
||||||
}
|
|
||||||
|
|
||||||
if task.Worker == nil {
|
|
||||||
logger := logCtx.Logger()
|
|
||||||
logger.Warn().Uint("workerDBID", *task.WorkerID).
|
|
||||||
Msg("TimeoutChecker: task is assigned to worker that no longer exists")
|
|
||||||
return "-unknown-", logger
|
|
||||||
}
|
|
||||||
|
|
||||||
logCtx = logCtx.
|
|
||||||
Str("worker", task.Worker.UUID).
|
|
||||||
Str("workerName", task.Worker.Name)
|
|
||||||
logger := logCtx.Logger()
|
|
||||||
logger.Warn().Msg("TimeoutChecker: task timed out")
|
|
||||||
|
|
||||||
return task.Worker.Identifier(), logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// func (ttc *TimeoutChecker) checkWorkers(db *mgo.Database) {
|
// func (ttc *TimeoutChecker) checkWorkers(db *mgo.Database) {
|
||||||
// timeoutThreshold := UtcNow().Add(-ttc.config.ActiveWorkerTimeoutInterval)
|
// timeoutThreshold := UtcNow().Add(-ttc.config.ActiveWorkerTimeoutInterval)
|
||||||
// log.Debugf("Failing all awake workers that have not been seen since %s", timeoutThreshold)
|
// log.Debugf("Failing all awake workers that have not been seen since %s", timeoutThreshold)
|
||||||
|
@ -4,145 +4,15 @@ package timeout_checker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/benbjohnson/clock"
|
"github.com/benbjohnson/clock"
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
|
|
||||||
"git.blender.org/flamenco/internal/manager/persistence"
|
|
||||||
"git.blender.org/flamenco/internal/manager/timeout_checker/mocks"
|
"git.blender.org/flamenco/internal/manager/timeout_checker/mocks"
|
||||||
"git.blender.org/flamenco/pkg/api"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const taskTimeout = 20 * time.Minute
|
|
||||||
|
|
||||||
func TestTimeoutCheckerTiming(t *testing.T) {
|
|
||||||
ttc, finish, mocks := timeoutCheckerTestFixtures(t)
|
|
||||||
defer finish()
|
|
||||||
|
|
||||||
mocks.run(ttc)
|
|
||||||
|
|
||||||
// Wait for the timeout checker to actually be sleeping, otherwise it could
|
|
||||||
// have a different sleep-start time than we expect.
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
|
|
||||||
// Determine the deadlines relative to the initial clock value.
|
|
||||||
initialTime := mocks.clock.Now().UTC()
|
|
||||||
deadlines := []time.Time{
|
|
||||||
initialTime.Add(timeoutInitialSleep - taskTimeout),
|
|
||||||
initialTime.Add(timeoutInitialSleep - taskTimeout + 1*timeoutCheckInterval),
|
|
||||||
initialTime.Add(timeoutInitialSleep - taskTimeout + 2*timeoutCheckInterval),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Expect three fetches, one after the initial sleep time, and two a regular interval later.
|
|
||||||
fetchTimes := make([]time.Time, len(deadlines))
|
|
||||||
firstCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[0]).
|
|
||||||
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
|
|
||||||
fetchTimes[0] = mocks.clock.Now().UTC()
|
|
||||||
return []*persistence.Task{}, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
secondCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[1]).
|
|
||||||
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
|
|
||||||
fetchTimes[1] = mocks.clock.Now().UTC()
|
|
||||||
// Return a database error. This shouldn't break the check loop.
|
|
||||||
return []*persistence.Task{}, errors.New("testing what errors do")
|
|
||||||
}).
|
|
||||||
After(firstCall)
|
|
||||||
|
|
||||||
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[2]).
|
|
||||||
DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) {
|
|
||||||
fetchTimes[2] = mocks.clock.Now().UTC()
|
|
||||||
return []*persistence.Task{}, nil
|
|
||||||
}).
|
|
||||||
After(secondCall)
|
|
||||||
|
|
||||||
mocks.clock.Add(2 * time.Minute) // Should still be sleeping.
|
|
||||||
mocks.clock.Add(2 * time.Minute) // Should still be sleeping.
|
|
||||||
mocks.clock.Add(time.Minute) // Should trigger the first fetch.
|
|
||||||
mocks.clock.Add(time.Minute) // Should trigger the second fetch.
|
|
||||||
mocks.clock.Add(time.Minute) // Should trigger the third fetch.
|
|
||||||
|
|
||||||
// Wait for the timeout checker to actually run & hit the expected calls.
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
|
|
||||||
for idx, fetchTime := range fetchTimes {
|
|
||||||
// Check for zero values first, because they can be a bit confusing in the assert.Equal() logs.
|
|
||||||
if !assert.Falsef(t, fetchTime.IsZero(), "fetchTime[%d] should not be zero", idx) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
expect := initialTime.Add(timeoutInitialSleep + time.Duration(idx)*timeoutCheckInterval)
|
|
||||||
assert.Equalf(t, expect, fetchTime, "fetchTime[%d] not as expected", idx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTaskTimeout(t *testing.T) {
|
|
||||||
// Canary test: if these constants do not have the expected value, the test
|
|
||||||
// will fail rather cryptically.
|
|
||||||
if !assert.Equal(t, 5*time.Minute, timeoutInitialSleep, "timeoutInitialSleep does not have the expected value") ||
|
|
||||||
!assert.Equal(t, 1*time.Minute, timeoutCheckInterval, "timeoutCheckInterval does not have the expected value") {
|
|
||||||
t.FailNow()
|
|
||||||
}
|
|
||||||
|
|
||||||
ttc, finish, mocks := timeoutCheckerTestFixtures(t)
|
|
||||||
defer finish()
|
|
||||||
|
|
||||||
mocks.run(ttc)
|
|
||||||
|
|
||||||
// Wait for the timeout checker to actually be sleeping, otherwise it could
|
|
||||||
// have a different sleep-start time than we expect.
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
|
|
||||||
lastTime := mocks.clock.Now().UTC().Add(-1 * time.Hour)
|
|
||||||
|
|
||||||
job := persistence.Job{UUID: "JOB-UUID"}
|
|
||||||
worker := persistence.Worker{
|
|
||||||
UUID: "WORKER-UUID",
|
|
||||||
Name: "Tester",
|
|
||||||
Model: gorm.Model{ID: 47},
|
|
||||||
}
|
|
||||||
taskUnassigned := persistence.Task{
|
|
||||||
UUID: "TASK-UUID-UNASSIGNED",
|
|
||||||
Job: &job,
|
|
||||||
LastTouchedAt: lastTime,
|
|
||||||
}
|
|
||||||
taskUnknownWorker := persistence.Task{
|
|
||||||
UUID: "TASK-UUID-UNKNOWN",
|
|
||||||
Job: &job,
|
|
||||||
LastTouchedAt: lastTime,
|
|
||||||
WorkerID: &worker.ID,
|
|
||||||
}
|
|
||||||
taskAssigned := persistence.Task{
|
|
||||||
UUID: "TASK-UUID-ASSIGNED",
|
|
||||||
Job: &job,
|
|
||||||
LastTouchedAt: lastTime,
|
|
||||||
WorkerID: &worker.ID,
|
|
||||||
Worker: &worker,
|
|
||||||
}
|
|
||||||
|
|
||||||
mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()).
|
|
||||||
Return([]*persistence.Task{&taskUnassigned, &taskUnknownWorker, &taskAssigned}, nil)
|
|
||||||
|
|
||||||
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnassigned, api.TaskStatusFailed)
|
|
||||||
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnknownWorker, api.TaskStatusFailed)
|
|
||||||
mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskAssigned, api.TaskStatusFailed)
|
|
||||||
|
|
||||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnassigned.UUID,
|
|
||||||
"Task timed out. It was assigned to worker -unassigned-, but untouched since 1969-12-31T23:00:00Z")
|
|
||||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnknownWorker.UUID,
|
|
||||||
"Task timed out. It was assigned to worker -unknown-, but untouched since 1969-12-31T23:00:00Z")
|
|
||||||
mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskAssigned.UUID,
|
|
||||||
"Task timed out. It was assigned to worker Tester (WORKER-UUID), but untouched since 1969-12-31T23:00:00Z")
|
|
||||||
|
|
||||||
// All the timeouts should be handled after the initial sleep.
|
|
||||||
mocks.clock.Add(timeoutInitialSleep)
|
|
||||||
}
|
|
||||||
|
|
||||||
type TimeoutCheckerMocks struct {
|
type TimeoutCheckerMocks struct {
|
||||||
clock *clock.Mock
|
clock *clock.Mock
|
||||||
persist *mocks.MockPersistenceService
|
persist *mocks.MockPersistenceService
|
||||||
|
Loading…
x
Reference in New Issue
Block a user