Sleep Scheduler implementation for the Manager

The Manager now has a sleep scheduler for Workers. The API and background
service work, but there is no web interface yet.

Manifest Task: T99397
This commit is contained in:
Sybren A. Stüvel 2022-07-17 17:27:32 +02:00
parent 3133bd2487
commit d7b164133a
17 changed files with 1520 additions and 115 deletions

View File

@ -40,6 +40,7 @@ import (
"git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/local_storage" "git.blender.org/flamenco/internal/manager/local_storage"
"git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/sleep_scheduler"
"git.blender.org/flamenco/internal/manager/swagger_ui" "git.blender.org/flamenco/internal/manager/swagger_ui"
"git.blender.org/flamenco/internal/manager/task_logs" "git.blender.org/flamenco/internal/manager/task_logs"
"git.blender.org/flamenco/internal/manager/task_state_machine" "git.blender.org/flamenco/internal/manager/task_state_machine"
@ -159,11 +160,12 @@ func runFlamencoManager() bool {
logStorage := task_logs.NewStorage(localStorage, timeService, webUpdater) logStorage := task_logs.NewStorage(localStorage, timeService, webUpdater)
taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage) taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage)
sleepScheduler := sleep_scheduler.New(timeService, persist, webUpdater)
lastRender := last_rendered.New(localStorage) lastRender := last_rendered.New(localStorage)
shamanServer := buildShamanServer(configService, isFirstRun) shamanServer := buildShamanServer(configService, isFirstRun)
flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine,
shamanServer, logStorage, webUpdater, lastRender, localStorage) shamanServer, logStorage, webUpdater, lastRender, localStorage, sleepScheduler)
e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage) e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage)
timeoutChecker := timeout_checker.New( timeoutChecker := timeout_checker.New(
@ -223,6 +225,13 @@ func runFlamencoManager() bool {
timeoutChecker.Run(mainCtx) timeoutChecker.Run(mainCtx)
}() }()
// Run the Worker sleep scheduler.
wg.Add(1)
go func() {
defer wg.Done()
sleepScheduler.Run(mainCtx)
}()
// Open a webbrowser, but give the web service some time to start first. // Open a webbrowser, but give the web service some time to start first.
if isFirstRun { if isFirstRun {
go openWebbrowser(mainCtx, urls[0]) go openWebbrowser(mainCtx, urls[0])
@ -250,6 +259,7 @@ func buildFlamencoAPI(
webUpdater *webupdates.BiDirComms, webUpdater *webupdates.BiDirComms,
lastRender *last_rendered.LastRenderedProcessor, lastRender *last_rendered.LastRenderedProcessor,
localStorage local_storage.StorageInfo, localStorage local_storage.StorageInfo,
sleepScheduler *sleep_scheduler.SleepScheduler,
) *api_impl.Flamenco { ) *api_impl.Flamenco {
compiler, err := job_compilers.Load(timeService) compiler, err := job_compilers.Load(timeService)
if err != nil { if err != nil {
@ -258,7 +268,7 @@ func buildFlamencoAPI(
flamenco := api_impl.NewFlamenco( flamenco := api_impl.NewFlamenco(
compiler, persist, webUpdater, logStorage, configService, compiler, persist, webUpdater, logStorage, configService,
taskStateMachine, shamanServer, timeService, lastRender, taskStateMachine, shamanServer, timeService, lastRender,
localStorage) localStorage, sleepScheduler)
return flamenco return flamenco
} }
@ -272,6 +282,7 @@ func buildWebService(
) *echo.Echo { ) *echo.Echo {
e := echo.New() e := echo.New()
e.HideBanner = true e.HideBanner = true
e.HidePort = true
// The request should come in fairly quickly, given that Flamenco is intended // The request should come in fairly quickly, given that Flamenco is intended
// to run on a local network. // to run on a local network.

View File

@ -16,16 +16,17 @@ import (
) )
type Flamenco struct { type Flamenco struct {
jobCompiler JobCompiler jobCompiler JobCompiler
persist PersistenceService persist PersistenceService
broadcaster ChangeBroadcaster broadcaster ChangeBroadcaster
logStorage LogStorage logStorage LogStorage
config ConfigService config ConfigService
stateMachine TaskStateMachine stateMachine TaskStateMachine
shaman Shaman shaman Shaman
clock TimeService clock TimeService
lastRender LastRendered lastRender LastRendered
localStorage LocalStorage localStorage LocalStorage
sleepScheduler WorkerSleepScheduler
// The task scheduler can be locked to prevent multiple Workers from getting // The task scheduler can be locked to prevent multiple Workers from getting
// the same task. It is also used for certain other queries, like // the same task. It is also used for certain other queries, like
@ -51,18 +52,20 @@ func NewFlamenco(
ts TimeService, ts TimeService,
lr LastRendered, lr LastRendered,
localStorage LocalStorage, localStorage LocalStorage,
wss WorkerSleepScheduler,
) *Flamenco { ) *Flamenco {
return &Flamenco{ return &Flamenco{
jobCompiler: jc, jobCompiler: jc,
persist: jps, persist: jps,
broadcaster: b, broadcaster: b,
logStorage: logStorage, logStorage: logStorage,
config: cs, config: cs,
stateMachine: sm, stateMachine: sm,
shaman: sha, shaman: sha,
clock: ts, clock: ts,
lastRender: lr, lastRender: lr,
localStorage: localStorage, localStorage: localStorage,
sleepScheduler: wss,
done: make(chan struct{}), done: make(chan struct{}),
} }

View File

@ -17,6 +17,7 @@ import (
"git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/internal/manager/job_compilers"
"git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/last_rendered"
"git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/sleep_scheduler"
"git.blender.org/flamenco/internal/manager/task_state_machine" "git.blender.org/flamenco/internal/manager/task_state_machine"
"git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/internal/manager/webupdates"
"git.blender.org/flamenco/pkg/api" "git.blender.org/flamenco/pkg/api"
@ -24,7 +25,7 @@ import (
) )
// Generate mock implementations of these interfaces. // Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage //go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler
type PersistenceService interface { type PersistenceService interface {
StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error
@ -65,9 +66,6 @@ type PersistenceService interface {
// CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type. // CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type.
CountTaskFailuresOfWorker(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) (int, error) CountTaskFailuresOfWorker(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) (int, error)
FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error)
SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule persistence.SleepSchedule) error
// Database queries. // Database queries.
QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error)
QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error)
@ -206,3 +204,10 @@ type TimeService interface {
} }
var _ TimeService = (clock.Clock)(nil) var _ TimeService = (clock.Clock)(nil)
type WorkerSleepScheduler interface {
FetchSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error)
SetSchedule(ctx context.Context, workerUUID string, schedule *persistence.SleepSchedule) error
}
var _ WorkerSleepScheduler = (*sleep_scheduler.SleepScheduler)(nil)

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage) // Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler)
// Package mocks is a generated GoMock package. // Package mocks is a generated GoMock package.
package mocks package mocks
@ -216,21 +216,6 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{}
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1)
} }
// FetchWorkerSleepSchedule mocks base method.
func (m *MockPersistenceService) FetchWorkerSleepSchedule(arg0 context.Context, arg1 string) (*persistence.SleepSchedule, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchWorkerSleepSchedule", arg0, arg1)
ret0, _ := ret[0].(*persistence.SleepSchedule)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchWorkerSleepSchedule indicates an expected call of FetchWorkerSleepSchedule.
func (mr *MockPersistenceServiceMockRecorder) FetchWorkerSleepSchedule(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerSleepSchedule), arg0, arg1)
}
// FetchWorkers mocks base method. // FetchWorkers mocks base method.
func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -390,20 +375,6 @@ func (mr *MockPersistenceServiceMockRecorder) SetLastRendered(arg0, arg1 interfa
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastRendered", reflect.TypeOf((*MockPersistenceService)(nil).SetLastRendered), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastRendered", reflect.TypeOf((*MockPersistenceService)(nil).SetLastRendered), arg0, arg1)
} }
// SetWorkerSleepSchedule mocks base method.
func (m *MockPersistenceService) SetWorkerSleepSchedule(arg0 context.Context, arg1 string, arg2 persistence.SleepSchedule) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetWorkerSleepSchedule", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// SetWorkerSleepSchedule indicates an expected call of SetWorkerSleepSchedule.
func (mr *MockPersistenceServiceMockRecorder) SetWorkerSleepSchedule(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).SetWorkerSleepSchedule), arg0, arg1, arg2)
}
// StoreAuthoredJob mocks base method. // StoreAuthoredJob mocks base method.
func (m *MockPersistenceService) StoreAuthoredJob(arg0 context.Context, arg1 job_compilers.AuthoredJob) error { func (m *MockPersistenceService) StoreAuthoredJob(arg0 context.Context, arg1 job_compilers.AuthoredJob) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -1116,3 +1087,55 @@ func (mr *MockLocalStorageMockRecorder) RelPath(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RelPath", reflect.TypeOf((*MockLocalStorage)(nil).RelPath), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RelPath", reflect.TypeOf((*MockLocalStorage)(nil).RelPath), arg0)
} }
// MockWorkerSleepScheduler is a mock of WorkerSleepScheduler interface.
type MockWorkerSleepScheduler struct {
ctrl *gomock.Controller
recorder *MockWorkerSleepSchedulerMockRecorder
}
// MockWorkerSleepSchedulerMockRecorder is the mock recorder for MockWorkerSleepScheduler.
type MockWorkerSleepSchedulerMockRecorder struct {
mock *MockWorkerSleepScheduler
}
// NewMockWorkerSleepScheduler creates a new mock instance.
func NewMockWorkerSleepScheduler(ctrl *gomock.Controller) *MockWorkerSleepScheduler {
mock := &MockWorkerSleepScheduler{ctrl: ctrl}
mock.recorder = &MockWorkerSleepSchedulerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockWorkerSleepScheduler) EXPECT() *MockWorkerSleepSchedulerMockRecorder {
return m.recorder
}
// FetchSchedule mocks base method.
func (m *MockWorkerSleepScheduler) FetchSchedule(arg0 context.Context, arg1 string) (*persistence.SleepSchedule, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchSchedule", arg0, arg1)
ret0, _ := ret[0].(*persistence.SleepSchedule)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchSchedule indicates an expected call of FetchSchedule.
func (mr *MockWorkerSleepSchedulerMockRecorder) FetchSchedule(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSchedule", reflect.TypeOf((*MockWorkerSleepScheduler)(nil).FetchSchedule), arg0, arg1)
}
// SetSchedule mocks base method.
func (m *MockWorkerSleepScheduler) SetSchedule(arg0 context.Context, arg1 string, arg2 *persistence.SleepSchedule) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetSchedule", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// SetSchedule indicates an expected call of SetSchedule.
func (mr *MockWorkerSleepSchedulerMockRecorder) SetSchedule(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSchedule", reflect.TypeOf((*MockWorkerSleepScheduler)(nil).SetSchedule), arg0, arg1, arg2)
}

View File

@ -23,17 +23,18 @@ import (
) )
type mockedFlamenco struct { type mockedFlamenco struct {
flamenco *Flamenco flamenco *Flamenco
jobCompiler *mocks.MockJobCompiler jobCompiler *mocks.MockJobCompiler
persistence *mocks.MockPersistenceService persistence *mocks.MockPersistenceService
broadcaster *mocks.MockChangeBroadcaster broadcaster *mocks.MockChangeBroadcaster
logStorage *mocks.MockLogStorage logStorage *mocks.MockLogStorage
config *mocks.MockConfigService config *mocks.MockConfigService
stateMachine *mocks.MockTaskStateMachine stateMachine *mocks.MockTaskStateMachine
shaman *mocks.MockShaman shaman *mocks.MockShaman
clock *clock.Mock clock *clock.Mock
lastRender *mocks.MockLastRendered lastRender *mocks.MockLastRendered
localStorage *mocks.MockLocalStorage localStorage *mocks.MockLocalStorage
sleepScheduler *mocks.MockWorkerSleepScheduler
// Place for some tests to store a temporary directory. // Place for some tests to store a temporary directory.
tempdir string tempdir string
@ -49,6 +50,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
sha := mocks.NewMockShaman(mockCtrl) sha := mocks.NewMockShaman(mockCtrl)
lr := mocks.NewMockLastRendered(mockCtrl) lr := mocks.NewMockLastRendered(mockCtrl)
localStore := mocks.NewMockLocalStorage(mockCtrl) localStore := mocks.NewMockLocalStorage(mockCtrl)
wss := mocks.NewMockWorkerSleepScheduler(mockCtrl)
clock := clock.NewMock() clock := clock.NewMock()
mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00") mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00")
@ -57,19 +59,20 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco {
} }
clock.Set(mockedNow) clock.Set(mockedNow)
f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore) f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore, wss)
return mockedFlamenco{ return mockedFlamenco{
flamenco: f, flamenco: f,
jobCompiler: jc, jobCompiler: jc,
persistence: ps, persistence: ps,
broadcaster: cb, broadcaster: cb,
logStorage: logStore, logStorage: logStore,
config: cs, config: cs,
stateMachine: sm, stateMachine: sm,
clock: clock, clock: clock,
lastRender: lr, lastRender: lr,
localStorage: localStore, localStorage: localStore,
sleepScheduler: wss,
} }
} }

View File

@ -18,7 +18,7 @@ func (f *Flamenco) FetchWorkerSleepSchedule(e echo.Context, workerUUID string) e
ctx := e.Request().Context() ctx := e.Request().Context()
logger := requestLogger(e) logger := requestLogger(e)
logger = logger.With().Str("worker", workerUUID).Logger() logger = logger.With().Str("worker", workerUUID).Logger()
schedule, err := f.persist.FetchWorkerSleepSchedule(ctx, workerUUID) schedule, err := f.sleepScheduler.FetchSchedule(ctx, workerUUID)
switch { switch {
case errors.Is(err, persistence.ErrWorkerNotFound): case errors.Is(err, persistence.ErrWorkerNotFound):
@ -33,9 +33,9 @@ func (f *Flamenco) FetchWorkerSleepSchedule(e echo.Context, workerUUID string) e
apiSchedule := api.WorkerSleepSchedule{ apiSchedule := api.WorkerSleepSchedule{
DaysOfWeek: schedule.DaysOfWeek, DaysOfWeek: schedule.DaysOfWeek,
EndTime: schedule.EndTime, EndTime: schedule.EndTime.String(),
IsActive: schedule.IsActive, IsActive: schedule.IsActive,
StartTime: schedule.StartTime, StartTime: schedule.StartTime.String(),
} }
return e.JSON(http.StatusOK, apiSchedule) return e.JSON(http.StatusOK, apiSchedule)
} }
@ -57,14 +57,22 @@ func (f *Flamenco) SetWorkerSleepSchedule(e echo.Context, workerUUID string) err
} }
schedule := api.WorkerSleepSchedule(req) schedule := api.WorkerSleepSchedule(req)
// Create a sleep schedule that can be persisted.
dbSchedule := persistence.SleepSchedule{ dbSchedule := persistence.SleepSchedule{
IsActive: schedule.IsActive, IsActive: schedule.IsActive,
DaysOfWeek: schedule.DaysOfWeek, DaysOfWeek: schedule.DaysOfWeek,
StartTime: schedule.StartTime, }
EndTime: schedule.EndTime, if err := dbSchedule.StartTime.Scan(schedule.StartTime); err != nil {
logger.Warn().Err(err).Msg("bad request received, cannot parse schedule start time")
return sendAPIError(e, http.StatusBadRequest, "invalid format for schedule start time")
}
if err := dbSchedule.EndTime.Scan(schedule.EndTime); err != nil {
logger.Warn().Err(err).Msg("bad request received, cannot parse schedule end time")
return sendAPIError(e, http.StatusBadRequest, "invalid format for schedule end time")
} }
err = f.persist.SetWorkerSleepSchedule(ctx, workerUUID, dbSchedule) // Send the sleep schedule to the scheduler.
err = f.sleepScheduler.SetSchedule(ctx, workerUUID, &dbSchedule)
switch { switch {
case errors.Is(err, persistence.ErrWorkerNotFound): case errors.Is(err, persistence.ErrWorkerNotFound):
logger.Warn().Msg("SetWorkerSleepSchedule: worker does not exist") logger.Warn().Msg("SetWorkerSleepSchedule: worker does not exist")

View File

@ -0,0 +1,109 @@
package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"database/sql/driver"
"fmt"
"time"
)
const (
timeOfDayStringFormat = "%02d:%02d"
// Assigned to the Hour and Minute fields to indicate "no value".
timeOfDayNoValue = -1
)
// TimeOfDay represents a time of day, and can be converted to/from a string.
// Its date and timezone components are ignored, and the time is supposed to be
// interpreted as local time on any date (f.e. a scheduled sleep time of some
// Worker on a certain day-of-week & local timezone).
//
// TimeOfDay structs can also represent "no value", which will be marshaled as
// an empty string.
type TimeOfDay struct {
Hour int
Minute int
}
// MakeTimeOfDay converts a time.Time into a TimeOfDay.
func MakeTimeOfDay(someTime time.Time) TimeOfDay {
return TimeOfDay{someTime.Hour(), someTime.Minute()}
}
// EmptyTimeOfDay returns a TimeOfDay struct with no value.
// See `TimeOfDay.HasValue()`.
func EmptyTimeOfDay() TimeOfDay {
return TimeOfDay{Hour: timeOfDayNoValue, Minute: timeOfDayNoValue}
}
// Value converts a TimeOfDay to a value usable by SQL databases.
func (ot TimeOfDay) Value() (driver.Value, error) {
var asString = ot.String()
return asString, nil
}
// Scan updates this TimeOfDay from the value stored in a database.
func (ot *TimeOfDay) Scan(value interface{}) error {
b, ok := value.(string)
if !ok {
return fmt.Errorf("expected string, received %T", value)
}
return ot.setString(string(b))
}
// Equals returns True iff both times represent the same time of day.
func (ot TimeOfDay) Equals(other TimeOfDay) bool {
return ot.Hour == other.Hour && ot.Minute == other.Minute
}
// IsBefore returns True iff ot is before other.
// Ignores everything except hour and minute fields.
func (ot TimeOfDay) IsBefore(other TimeOfDay) bool {
if ot.Hour != other.Hour {
return ot.Hour < other.Hour
}
return ot.Minute < other.Minute
}
// IsAfter returns True iff ot is after other.
// Ignores everything except hour and minute fields.
func (ot TimeOfDay) IsAfter(other TimeOfDay) bool {
if ot.Hour != other.Hour {
return ot.Hour > other.Hour
}
return ot.Minute > other.Minute
}
// OnDate returns the time of day in the local timezone on the given date.
func (ot TimeOfDay) OnDate(date time.Time) time.Time {
year, month, day := date.Date()
return time.Date(year, month, day, ot.Hour, ot.Minute, 0, 0, time.Local)
}
func (ot TimeOfDay) String() string {
if !ot.HasValue() {
return ""
}
return fmt.Sprintf(timeOfDayStringFormat, ot.Hour, ot.Minute)
}
func (ot TimeOfDay) HasValue() bool {
return ot.Hour != timeOfDayNoValue && ot.Minute != timeOfDayNoValue
}
func (ot *TimeOfDay) setString(value string) error {
scanned := TimeOfDay{}
if value == "" {
*ot = TimeOfDay{timeOfDayNoValue, timeOfDayNoValue}
return nil
}
_, err := fmt.Sscanf(value, timeOfDayStringFormat, &scanned.Hour, &scanned.Minute)
if err != nil {
return err
}
*ot = scanned
return nil
}

View File

@ -0,0 +1,129 @@
package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
var emptyToD = TimeOfDay{timeOfDayNoValue, timeOfDayNoValue}
func TestIsBefore(t *testing.T) {
test := func(expect bool, hour1, min1, hour2, min2 int) {
time1 := TimeOfDay{hour1, min1}
time2 := TimeOfDay{hour2, min2}
assert.Equal(t, expect, time1.IsBefore(time2))
}
test(false, 0, 0, 0, 0)
test(true, 0, 0, 0, 1)
test(true, 1, 59, 2, 0)
test(true, 1, 2, 1, 3)
test(true, 1, 2, 15, 1)
test(false, 17, 0, 8, 0)
}
func TestIsAfter(t *testing.T) {
test := func(expect bool, hour1, min1, hour2, min2 int) {
time1 := TimeOfDay{hour1, min1}
time2 := TimeOfDay{hour2, min2}
assert.Equal(t, expect, time1.IsAfter(time2))
}
test(false, 0, 0, 0, 0)
test(true, 0, 1, 0, 0)
test(true, 2, 1, 1, 59)
test(true, 1, 3, 1, 2)
test(true, 15, 1, 1, 2)
test(false, 8, 0, 17, 0)
}
func TestOnDate(t *testing.T) {
theDate := time.Date(2018, 12, 13, 7, 59, 43, 123, time.Local)
tod := TimeOfDay{16, 47}
expect := time.Date(2018, 12, 13, 16, 47, 0, 0, time.Local)
assert.Equal(t, expect, tod.OnDate(theDate))
// Midnight on the same day.
tod = TimeOfDay{0, 0}
expect = time.Date(2018, 12, 13, 0, 0, 0, 0, time.Local)
assert.Equal(t, expect, tod.OnDate(theDate))
// Midnight a day later.
tod = TimeOfDay{24, 0}
expect = time.Date(2018, 12, 14, 0, 0, 0, 0, time.Local)
assert.Equal(t, expect, tod.OnDate(theDate))
}
func TestValue(t *testing.T) {
// Test zero -> "00:00"
tod := TimeOfDay{}
if value, err := tod.Value(); assert.NoError(t, err) {
assert.Equal(t, "00:00", value)
}
// Test 22:47 -> "22:47"
tod = TimeOfDay{22, 47}
if value, err := tod.Value(); assert.NoError(t, err) {
assert.Equal(t, "22:47", value)
}
// Test empty -> ""
tod = emptyToD
if value, err := tod.Value(); assert.NoError(t, err) {
assert.Equal(t, "", value)
}
}
func TestScan(t *testing.T) {
// Test zero -> empty
tod := TimeOfDay{}
if assert.NoError(t, tod.Scan("")) {
assert.Equal(t, emptyToD, tod)
}
// Test 22:47 -> empty
tod = TimeOfDay{22, 47}
if assert.NoError(t, tod.Scan("")) {
assert.Equal(t, emptyToD, tod)
}
// Test 22:47 -> 12:34
tod = TimeOfDay{22, 47}
if assert.NoError(t, tod.Scan("12:34")) {
assert.Equal(t, TimeOfDay{12, 34}, tod)
}
// Test empty -> empty
tod = emptyToD
if assert.NoError(t, tod.Scan("")) {
assert.Equal(t, emptyToD, tod)
}
// Test empty -> 12:34
tod = emptyToD
if assert.NoError(t, tod.Scan("12:34")) {
assert.Equal(t, TimeOfDay{12, 34}, tod)
}
}
func TestHasValue(t *testing.T) {
zeroTod := TimeOfDay{}
assert.True(t, zeroTod.HasValue(), "zero value should be midnight, and thus be a valid value")
fullToD := TimeOfDay{22, 47}
assert.True(t, fullToD.HasValue())
noValueToD := TimeOfDay{timeOfDayNoValue, timeOfDayNoValue}
assert.False(t, noValueToD.HasValue())
onlyMinuteValue := TimeOfDay{timeOfDayNoValue, 47}
assert.False(t, onlyMinuteValue.HasValue())
onlyHourValue := TimeOfDay{22, timeOfDayNoValue}
assert.False(t, onlyHourValue.HasValue())
}

View File

@ -23,13 +23,16 @@ type SleepSchedule struct {
// Space-separated two-letter strings indicating days of week the schedule is // Space-separated two-letter strings indicating days of week the schedule is
// active ("mo", "tu", etc.). Empty means "every day". // active ("mo", "tu", etc.). Empty means "every day".
DaysOfWeek string `gorm:"default:''"` DaysOfWeek string `gorm:"default:''"`
StartTime string `gorm:"default:''"` StartTime TimeOfDay `gorm:"default:''"`
EndTime string `gorm:"default:''"` EndTime TimeOfDay `gorm:"default:''"`
NextCheck *time.Time NextCheck time.Time
} }
// FetchWorkerSleepSchedule fetches the worker's sleep schedule.
// It does not fetch the worker itself. If you need that, call
// `FetchSleepScheduleWorker()` afterwards.
func (db *DB) FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*SleepSchedule, error) { func (db *DB) FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*SleepSchedule, error) {
logger := log.With().Str("worker", workerUUID).Logger() logger := log.With().Str("worker", workerUUID).Logger()
logger.Trace().Msg("fetching worker sleep schedule") logger.Trace().Msg("fetching worker sleep schedule")
@ -49,7 +52,7 @@ func (db *DB) FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (
return &sched, nil return &sched, nil
} }
func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule SleepSchedule) error { func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule *SleepSchedule) error {
logger := log.With().Str("worker", workerUUID).Logger() logger := log.With().Str("worker", workerUUID).Logger()
logger.Trace().Msg("setting worker sleep schedule") logger.Trace().Msg("setting worker sleep schedule")
@ -68,3 +71,39 @@ func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, sch
Create(&schedule) Create(&schedule)
return tx.Error return tx.Error
} }
func (db *DB) SetWorkerSleepScheduleNextCheck(ctx context.Context, schedule *SleepSchedule) error {
tx := db.gormDB.WithContext(ctx).
Select("next_check").
Updates(schedule)
return tx.Error
}
// FetchSleepScheduleWorker sets the given schedule's `Worker` pointer.
func (db *DB) FetchSleepScheduleWorker(ctx context.Context, schedule *SleepSchedule) error {
var worker Worker
tx := db.gormDB.WithContext(ctx).First(&worker, schedule.WorkerID)
if tx.Error != nil {
return workerError(tx.Error, "finding worker by their sleep schedule")
}
schedule.Worker = &worker
return nil
}
// FetchSleepSchedulesToCheck returns the sleep schedules that are due for a check.
func (db *DB) FetchSleepSchedulesToCheck(ctx context.Context) ([]*SleepSchedule, error) {
log.Trace().Msg("fetching sleep schedules that need checking")
now := db.gormDB.NowFunc()
schedules := []*SleepSchedule{}
tx := db.gormDB.WithContext(ctx).
Model(&SleepSchedule{}).
Where("is_active = ?", true).
Where("next_check <= ? or next_check is NULL or next_check = ''", now).
Scan(&schedules)
if tx.Error != nil {
return nil, tx.Error
}
return schedules, nil
}

View File

@ -44,8 +44,8 @@ func TestFetchWorkerSleepSchedule(t *testing.T) {
IsActive: true, IsActive: true,
DaysOfWeek: "mo,tu,th,fr", DaysOfWeek: "mo,tu,th,fr",
StartTime: "18:00", StartTime: TimeOfDay{18, 0},
EndTime: "09:00", EndTime: TimeOfDay{9, 0},
} }
tx := db.gormDB.Create(&created) tx := db.gormDB.Create(&created)
if !assert.NoError(t, tx.Error) { if !assert.NoError(t, tx.Error) {
@ -57,6 +57,52 @@ func TestFetchWorkerSleepSchedule(t *testing.T) {
assertEqualSleepSchedule(t, linuxWorker.ID, created, *fetched) assertEqualSleepSchedule(t, linuxWorker.ID, created, *fetched)
} }
func TestFetchSleepScheduleWorker(t *testing.T) {
ctx, finish, db := persistenceTestFixtures(t, 1*time.Second)
defer finish()
linuxWorker := Worker{
UUID: uuid.New(),
Name: "дрон",
Address: "fe80::5054:ff:fede:2ad7",
Platform: "linux",
Software: "3.0",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management",
}
err := db.CreateWorker(ctx, &linuxWorker)
if !assert.NoError(t, err) {
t.FailNow()
}
// Create a sleep schedule.
created := SleepSchedule{
WorkerID: linuxWorker.ID,
Worker: &linuxWorker,
IsActive: true,
DaysOfWeek: "mo,tu,th,fr",
StartTime: TimeOfDay{18, 0},
EndTime: TimeOfDay{9, 0},
}
tx := db.gormDB.Create(&created)
if !assert.NoError(t, tx.Error) {
t.FailNow()
}
dbSchedule, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err)
assert.Nil(t, dbSchedule.Worker, "worker should be nil when fetching schedule")
err = db.FetchSleepScheduleWorker(ctx, dbSchedule)
assert.NoError(t, err)
if assert.NotNil(t, dbSchedule.Worker) {
// Compare a few fields. If these are good, the correct worker has been fetched.
assert.Equal(t, linuxWorker.ID, dbSchedule.Worker.ID)
assert.Equal(t, linuxWorker.UUID, dbSchedule.Worker.UUID)
}
}
func TestSetWorkerSleepSchedule(t *testing.T) { func TestSetWorkerSleepSchedule(t *testing.T) {
ctx, finish, db := persistenceTestFixtures(t, 1*time.Second) ctx, finish, db := persistenceTestFixtures(t, 1*time.Second)
defer finish() defer finish()
@ -81,31 +127,39 @@ func TestSetWorkerSleepSchedule(t *testing.T) {
IsActive: true, IsActive: true,
DaysOfWeek: "mo,tu,th,fr", DaysOfWeek: "mo,tu,th,fr",
StartTime: "18:00", StartTime: TimeOfDay{18, 0},
EndTime: "09:00", EndTime: TimeOfDay{9, 0},
} }
// Not an existing Worker. // Not an existing Worker.
err = db.SetWorkerSleepSchedule(ctx, "2cf6153a-3d4e-49f4-a5c0-1c9fc176e155", schedule) err = db.SetWorkerSleepSchedule(ctx, "2cf6153a-3d4e-49f4-a5c0-1c9fc176e155", &schedule)
assert.ErrorIs(t, err, ErrWorkerNotFound) assert.ErrorIs(t, err, ErrWorkerNotFound)
// Create the sleep schedule. // Create the sleep schedule.
err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, schedule) err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &schedule)
assert.NoError(t, err) if !assert.NoError(t, err) {
t.FailNow()
}
fetched, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) fetched, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err) if !assert.NoError(t, err) {
t.FailNow()
}
assertEqualSleepSchedule(t, linuxWorker.ID, schedule, *fetched) assertEqualSleepSchedule(t, linuxWorker.ID, schedule, *fetched)
// Overwrite the schedule with one that already has a database ID. // Overwrite the schedule with one that already has a database ID.
newSchedule := schedule newSchedule := schedule
newSchedule.IsActive = false newSchedule.IsActive = false
newSchedule.DaysOfWeek = "mo,tu,we,th,fr" newSchedule.DaysOfWeek = "mo,tu,we,th,fr"
newSchedule.StartTime = "02:00" newSchedule.StartTime = TimeOfDay{2, 0}
newSchedule.EndTime = "06:00" newSchedule.EndTime = TimeOfDay{6, 0}
err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, newSchedule) err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &newSchedule)
assert.NoError(t, err) if !assert.NoError(t, err) {
t.FailNow()
}
fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err) if !assert.NoError(t, err) {
t.FailNow()
}
assertEqualSleepSchedule(t, linuxWorker.ID, newSchedule, *fetched) assertEqualSleepSchedule(t, linuxWorker.ID, newSchedule, *fetched)
// Overwrite the schedule with a freshly constructed one. // Overwrite the schedule with a freshly constructed one.
@ -115,21 +169,172 @@ func TestSetWorkerSleepSchedule(t *testing.T) {
IsActive: true, IsActive: true,
DaysOfWeek: "mo", DaysOfWeek: "mo",
StartTime: "03:27", StartTime: TimeOfDay{3, 0},
EndTime: "15:47", EndTime: TimeOfDay{15, 0},
}
err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &newerSchedule)
if !assert.NoError(t, err) {
t.FailNow()
} }
err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, newerSchedule)
assert.NoError(t, err)
fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err) if !assert.NoError(t, err) {
t.FailNow()
}
assertEqualSleepSchedule(t, linuxWorker.ID, newerSchedule, *fetched) assertEqualSleepSchedule(t, linuxWorker.ID, newerSchedule, *fetched)
// Clear the sleep schedule.
emptySchedule := SleepSchedule{
WorkerID: linuxWorker.ID,
Worker: &linuxWorker,
IsActive: false,
DaysOfWeek: "",
StartTime: emptyToD,
EndTime: emptyToD,
}
err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &emptySchedule)
if !assert.NoError(t, err) {
t.FailNow()
}
fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
if !assert.NoError(t, err) {
t.FailNow()
}
assertEqualSleepSchedule(t, linuxWorker.ID, emptySchedule, *fetched)
}
func TestSetWorkerSleepScheduleNextCheck(t *testing.T) {
ctx, finish, db := persistenceTestFixtures(t, 1*time.Second)
defer finish()
schedule := SleepSchedule{
Worker: &Worker{
UUID: "2b1f857a-fd64-484b-9c17-cf89bbe47be7",
Name: "дрон 1",
Status: api.WorkerStatusAwake,
},
IsActive: true,
DaysOfWeek: "mo,tu,th,fr",
StartTime: TimeOfDay{18, 0},
EndTime: TimeOfDay{9, 0},
}
// Use GORM to create the worker and sleep schedule in one go.
if tx := db.gormDB.Create(&schedule); tx.Error != nil {
panic(tx.Error)
}
future := db.gormDB.NowFunc().Add(5 * time.Hour)
schedule.NextCheck = future
err := db.SetWorkerSleepScheduleNextCheck(ctx, &schedule)
if !assert.NoError(t, err) {
t.FailNow()
}
fetched, err := db.FetchWorkerSleepSchedule(ctx, schedule.Worker.UUID)
if !assert.NoError(t, err) {
t.FailNow()
}
assertEqualSleepSchedule(t, schedule.Worker.ID, schedule, *fetched)
}
func TestFetchSleepSchedulesToCheck(t *testing.T) {
ctx, finish, db := persistenceTestFixtures(t, 1*time.Second)
defer finish()
mockedNow := mustParseTime("2022-06-07T11:14:47+02:00")
mockedPast := mockedNow.Add(-10 * time.Second)
mockedFuture := mockedNow.Add(10 * time.Second)
db.gormDB.NowFunc = func() time.Time { return mockedNow }
schedule0 := SleepSchedule{ // Next check in the past -> should be checked.
Worker: &Worker{
UUID: "2b1f857a-fd64-484b-9c17-cf89bbe47be7",
Name: "дрон 1",
Status: api.WorkerStatusAwake,
},
IsActive: true,
DaysOfWeek: "mo,tu,th,fr",
StartTime: TimeOfDay{18, 0},
EndTime: TimeOfDay{9, 0},
NextCheck: mockedPast,
}
schedule1 := SleepSchedule{ // Next check in future -> should not be checked.
Worker: &Worker{
UUID: "4475738e-41eb-47b2-8bca-2bbcabab69bb",
Name: "дрон 2",
Status: api.WorkerStatusAwake,
},
IsActive: true,
DaysOfWeek: "mo,tu,th,fr",
StartTime: TimeOfDay{18, 0},
EndTime: TimeOfDay{9, 0},
NextCheck: mockedFuture,
}
schedule2 := SleepSchedule{ // Next check is zero value -> should be checked.
Worker: &Worker{
UUID: "dc251817-6a11-4548-a36a-07b0d50b4c21",
Name: "дрон 3",
Status: api.WorkerStatusAwake,
},
IsActive: true,
DaysOfWeek: "mo,tu,th,fr",
StartTime: TimeOfDay{18, 0},
EndTime: TimeOfDay{9, 0},
NextCheck: time.Time{}, // zero value for time.
}
schedule3 := SleepSchedule{ // Schedule inactive -> should not be checked.
Worker: &Worker{
UUID: "874d5fc6-5784-4d43-8c20-6e7e73fc1b8d",
Name: "дрон 4",
Status: api.WorkerStatusAwake,
},
IsActive: false,
DaysOfWeek: "mo,tu,th,fr",
StartTime: TimeOfDay{18, 0},
EndTime: TimeOfDay{9, 0},
NextCheck: mockedPast, // next check in the past, so if active it would be checked.
}
// Use GORM to create the workers and sleep schedules in one go.
scheds := []*SleepSchedule{&schedule0, &schedule1, &schedule2, &schedule3}
for idx := range scheds {
if tx := db.gormDB.Create(scheds[idx]); tx.Error != nil {
panic(tx.Error)
}
}
toCheck, err := db.FetchSleepSchedulesToCheck(ctx)
if assert.NoError(t, err) && assert.Len(t, toCheck, 2) {
assertEqualSleepSchedule(t, schedule0.Worker.ID, schedule0, *toCheck[0])
assert.Nil(t, toCheck[0].Worker, "the Worker should NOT be fetched")
assertEqualSleepSchedule(t, schedule2.Worker.ID, schedule1, *toCheck[1])
assert.Nil(t, toCheck[1].Worker, "the Worker should NOT be fetched")
}
} }
func assertEqualSleepSchedule(t *testing.T, workerID uint, expect, actual SleepSchedule) { func assertEqualSleepSchedule(t *testing.T, workerID uint, expect, actual SleepSchedule) {
assert.Equal(t, workerID, actual.WorkerID) assert.Equal(t, workerID, actual.WorkerID, "sleep schedule is assigned to different worker")
assert.Nil(t, actual.Worker, "the Worker itself should not be fetched") assert.Nil(t, actual.Worker, "the Worker itself should not be fetched")
assert.Equal(t, expect.IsActive, actual.IsActive) assert.Equal(t, expect.IsActive, actual.IsActive, "IsActive does not match")
assert.Equal(t, expect.DaysOfWeek, actual.DaysOfWeek) assert.Equal(t, expect.DaysOfWeek, actual.DaysOfWeek, "DaysOfWeek does not match")
assert.Equal(t, expect.StartTime, actual.StartTime) assert.Equal(t, expect.StartTime, actual.StartTime, "StartTime does not match")
assert.Equal(t, expect.EndTime, actual.EndTime) assert.Equal(t, expect.EndTime, actual.EndTime, "EndTime does not match")
}
func mustParseTime(timeString string) time.Time {
parsed, err := time.Parse(time.RFC3339, timeString)
if err != nil {
panic(err)
}
return parsed
} }

View File

@ -83,10 +83,11 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) {
func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error {
err := db.gormDB.WithContext(ctx). err := db.gormDB.WithContext(ctx).
Model(w). Model(w).
Select("status", "status_requested"). Select("status", "status_requested", "lazy_status_request").
Updates(Worker{ Updates(Worker{
Status: w.Status, Status: w.Status,
StatusRequested: w.StatusRequested, StatusRequested: w.StatusRequested,
LazyStatusRequest: w.LazyStatusRequest,
}).Error }).Error
if err != nil { if err != nil {
return fmt.Errorf("saving worker: %w", err) return fmt.Errorf("saving worker: %w", err)

View File

@ -0,0 +1,93 @@
package sleep_scheduler
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"strings"
"time"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
// scheduledWorkerStatus returns the expected worker status at the given date/time.
func scheduledWorkerStatus(now time.Time, sched *persistence.SleepSchedule) api.WorkerStatus {
tod := persistence.MakeTimeOfDay(now)
if !sched.IsActive {
return api.WorkerStatusAwake
}
if sched.DaysOfWeek != "" {
weekdayName := strings.ToLower(now.Weekday().String()[:2])
if !strings.Contains(sched.DaysOfWeek, weekdayName) {
// There are days configured, and today is not a sleeping day.
return api.WorkerStatusAwake
}
}
beforeStart := sched.StartTime.HasValue() && tod.IsBefore(sched.StartTime)
afterEnd := sched.EndTime.HasValue() && !tod.IsBefore(sched.EndTime)
if beforeStart || afterEnd {
// Outside sleeping time.
return api.WorkerStatusAwake
}
return api.WorkerStatusAsleep
}
func cleanupDaysOfWeek(daysOfWeek string) string {
trimmed := strings.TrimSpace(daysOfWeek)
if trimmed == "" {
return ""
}
daynames := strings.Fields(trimmed)
for idx, name := range daynames {
daynames[idx] = strings.ToLower(strings.TrimSpace(name))[:2]
}
return strings.Join(daynames, " ")
}
// Return a timestamp when the next scheck for this schedule is due.
func calculateNextCheck(now time.Time, schedule *persistence.SleepSchedule) time.Time {
// calcNext returns the given time of day on "today" if that hasn't passed
// yet, otherwise on "tomorrow".
calcNext := func(tod persistence.TimeOfDay) time.Time {
nextCheck := tod.OnDate(now)
if nextCheck.Before(now) {
nextCheck = nextCheck.AddDate(0, 0, 1)
}
return nextCheck
}
nextChecks := []time.Time{
// Always check at the end of the day.
calcNext(persistence.TimeOfDay{Hour: 24, Minute: 0}),
}
// No start time means "start of the day", which is already covered by
// yesterday's "end of the day" check.
if schedule.StartTime.HasValue() {
nextChecks = append(nextChecks, calcNext(schedule.StartTime))
}
// No end time means "end of the day", which is already covered by today's
// "end of the day" check.
if schedule.EndTime.HasValue() {
nextChecks = append(nextChecks, calcNext(schedule.EndTime))
}
next := earliestTime(nextChecks)
return next
}
func earliestTime(timestamps []time.Time) time.Time {
earliest := timestamps[0]
for _, timestamp := range timestamps[1:] {
if timestamp.Before(earliest) {
earliest = timestamp
}
}
return earliest
}

View File

@ -0,0 +1,114 @@
package sleep_scheduler
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"testing"
"github.com/stretchr/testify/assert"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
func TestCalculateNextCheck(t *testing.T) {
_, mocks, _ := testFixtures(t)
var sched persistence.SleepSchedule
empty := persistence.EmptyTimeOfDay()
// Below, S, N, and E respectively mean Start, Now, and End times.
// Their order shows their relation to "Now". Lower-case letters mean "no value".
// Note that N can never be before 's' or after 'e'.
// S N E -> E
sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: mkToD(18, 0)}
assert.Equal(t, mocks.todayAt(18, 0), calculateNextCheck(mocks.todayAt(11, 16), &sched))
// S E N -> end of day
sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: mkToD(18, 0)}
assert.Equal(t, mocks.endOfDay(), calculateNextCheck(mocks.todayAt(19, 16), &sched))
// N S E -> S
sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: mkToD(18, 0)}
assert.Equal(t, mocks.todayAt(9, 0), calculateNextCheck(mocks.todayAt(8, 47), &sched))
// s N e -> end of day
sched = persistence.SleepSchedule{StartTime: empty, EndTime: empty}
assert.Equal(t, mocks.endOfDay(), calculateNextCheck(mocks.todayAt(7, 47), &sched))
// S N e -> end of day
sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: empty}
assert.Equal(t, mocks.endOfDay(), calculateNextCheck(mocks.todayAt(10, 47), &sched))
// s N E -> E
sched = persistence.SleepSchedule{StartTime: empty, EndTime: mkToD(18, 0)}
assert.Equal(t, mocks.todayAt(18, 0), calculateNextCheck(mocks.todayAt(7, 47), &sched))
}
func TestScheduledWorkerStatus(t *testing.T) {
_, mocks, _ := testFixtures(t)
var sched persistence.SleepSchedule
empty := persistence.EmptyTimeOfDay()
// Below, S, N, and E respectively mean Start, Now, and End times.
// Their order shows their relation to "Now". Lower-case letters mean "no value".
// Note that N can never be before 's' or after 'e'.
// Test time logic without any DaysOfWeek set, i.e. the scheduled times apply
// to each day.
// S N E -> asleep
sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: mkToD(18, 0), IsActive: true}
assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(11, 16), &sched))
// S E N -> awake
assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(19, 16), &sched))
// N S E -> awake
assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(8, 47), &sched))
// s N e -> asleep
sched = persistence.SleepSchedule{StartTime: empty, EndTime: empty, IsActive: true}
assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(7, 47), &sched))
// S N e -> asleep
sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: empty, IsActive: true}
assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(10, 47), &sched))
// s N E -> asleep
sched = persistence.SleepSchedule{StartTime: empty, EndTime: mkToD(18, 0), IsActive: true}
assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(7, 47), &sched))
// Test DaysOfWeek logic, but only with explicit start & end times. The logic
// for missing start/end is already covered above.
// The mocked "today" is a Tuesday.
// S N E unmentioned day -> awake
sched = persistence.SleepSchedule{DaysOfWeek: "mo we", StartTime: mkToD(9, 0), EndTime: mkToD(18, 0), IsActive: true}
assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(11, 16), &sched))
// S E N unmentioned day -> awake
assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(19, 16), &sched))
// N S E unmentioned day -> awake
assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(8, 47), &sched))
// S N E mentioned day -> asleep
sched = persistence.SleepSchedule{DaysOfWeek: "tu th fr", StartTime: mkToD(9, 0), EndTime: mkToD(18, 0), IsActive: true}
assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(11, 16), &sched))
// S E N mentioned day -> awake
assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(19, 16), &sched))
// N S E mentioned day -> awake
assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(8, 47), &sched))
}
func TestCleanupDaysOfWeek(t *testing.T) {
assert.Equal(t, "", cleanupDaysOfWeek(""))
assert.Equal(t, "mo tu we", cleanupDaysOfWeek("mo tu we"))
assert.Equal(t, "mo tu we", cleanupDaysOfWeek(" mo tu we \n"))
assert.Equal(t, "mo tu we", cleanupDaysOfWeek("monday tuesday wed"))
}

View File

@ -0,0 +1,37 @@
package sleep_scheduler
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/webupdates"
"git.blender.org/flamenco/pkg/api"
)
// Generate mock implementations of these interfaces.
//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/sleep_scheduler PersistenceService,ChangeBroadcaster
type PersistenceService interface {
FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error)
SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule *persistence.SleepSchedule) error
// FetchSleepScheduleWorker sets the given schedule's `Worker` pointer.
FetchSleepScheduleWorker(ctx context.Context, schedule *persistence.SleepSchedule) error
FetchSleepSchedulesToCheck(ctx context.Context) ([]*persistence.SleepSchedule, error)
SetWorkerSleepScheduleNextCheck(ctx context.Context, schedule *persistence.SleepSchedule) error
SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error
}
var _ PersistenceService = (*persistence.DB)(nil)
// TODO: Refactor the way worker status changes are handled, so that this
// service doens't need to broadcast its own worker updates.
type ChangeBroadcaster interface {
BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.
var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil)

View File

@ -0,0 +1,158 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: git.blender.org/flamenco/internal/manager/sleep_scheduler (interfaces: PersistenceService,ChangeBroadcaster)
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
persistence "git.blender.org/flamenco/internal/manager/persistence"
api "git.blender.org/flamenco/pkg/api"
gomock "github.com/golang/mock/gomock"
)
// MockPersistenceService is a mock of PersistenceService interface.
type MockPersistenceService struct {
ctrl *gomock.Controller
recorder *MockPersistenceServiceMockRecorder
}
// MockPersistenceServiceMockRecorder is the mock recorder for MockPersistenceService.
type MockPersistenceServiceMockRecorder struct {
mock *MockPersistenceService
}
// NewMockPersistenceService creates a new mock instance.
func NewMockPersistenceService(ctrl *gomock.Controller) *MockPersistenceService {
mock := &MockPersistenceService{ctrl: ctrl}
mock.recorder = &MockPersistenceServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder {
return m.recorder
}
// FetchSleepScheduleWorker mocks base method.
func (m *MockPersistenceService) FetchSleepScheduleWorker(arg0 context.Context, arg1 *persistence.SleepSchedule) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchSleepScheduleWorker", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// FetchSleepScheduleWorker indicates an expected call of FetchSleepScheduleWorker.
func (mr *MockPersistenceServiceMockRecorder) FetchSleepScheduleWorker(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSleepScheduleWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchSleepScheduleWorker), arg0, arg1)
}
// FetchSleepSchedulesToCheck mocks base method.
func (m *MockPersistenceService) FetchSleepSchedulesToCheck(arg0 context.Context) ([]*persistence.SleepSchedule, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchSleepSchedulesToCheck", arg0)
ret0, _ := ret[0].([]*persistence.SleepSchedule)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchSleepSchedulesToCheck indicates an expected call of FetchSleepSchedulesToCheck.
func (mr *MockPersistenceServiceMockRecorder) FetchSleepSchedulesToCheck(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSleepSchedulesToCheck", reflect.TypeOf((*MockPersistenceService)(nil).FetchSleepSchedulesToCheck), arg0)
}
// FetchWorkerSleepSchedule mocks base method.
func (m *MockPersistenceService) FetchWorkerSleepSchedule(arg0 context.Context, arg1 string) (*persistence.SleepSchedule, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchWorkerSleepSchedule", arg0, arg1)
ret0, _ := ret[0].(*persistence.SleepSchedule)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchWorkerSleepSchedule indicates an expected call of FetchWorkerSleepSchedule.
func (mr *MockPersistenceServiceMockRecorder) FetchWorkerSleepSchedule(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerSleepSchedule), arg0, arg1)
}
// SaveWorkerStatus mocks base method.
func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *persistence.Worker) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SaveWorkerStatus", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SaveWorkerStatus indicates an expected call of SaveWorkerStatus.
func (mr *MockPersistenceServiceMockRecorder) SaveWorkerStatus(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorkerStatus", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorkerStatus), arg0, arg1)
}
// SetWorkerSleepSchedule mocks base method.
func (m *MockPersistenceService) SetWorkerSleepSchedule(arg0 context.Context, arg1 string, arg2 *persistence.SleepSchedule) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetWorkerSleepSchedule", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// SetWorkerSleepSchedule indicates an expected call of SetWorkerSleepSchedule.
func (mr *MockPersistenceServiceMockRecorder) SetWorkerSleepSchedule(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).SetWorkerSleepSchedule), arg0, arg1, arg2)
}
// SetWorkerSleepScheduleNextCheck mocks base method.
func (m *MockPersistenceService) SetWorkerSleepScheduleNextCheck(arg0 context.Context, arg1 *persistence.SleepSchedule) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetWorkerSleepScheduleNextCheck", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SetWorkerSleepScheduleNextCheck indicates an expected call of SetWorkerSleepScheduleNextCheck.
func (mr *MockPersistenceServiceMockRecorder) SetWorkerSleepScheduleNextCheck(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkerSleepScheduleNextCheck", reflect.TypeOf((*MockPersistenceService)(nil).SetWorkerSleepScheduleNextCheck), arg0, arg1)
}
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct {
ctrl *gomock.Controller
recorder *MockChangeBroadcasterMockRecorder
}
// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster.
type MockChangeBroadcasterMockRecorder struct {
mock *MockChangeBroadcaster
}
// NewMockChangeBroadcaster creates a new mock instance.
func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster {
mock := &MockChangeBroadcaster{ctrl: ctrl}
mock.recorder = &MockChangeBroadcasterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder {
return m.recorder
}
// BroadcastWorkerUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastWorkerUpdate", arg0)
}
// BroadcastWorkerUpdate indicates an expected call of BroadcastWorkerUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerUpdate), arg0)
}

View File

@ -0,0 +1,198 @@
package sleep_scheduler
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"fmt"
"time"
"github.com/benbjohnson/clock"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/pkg/api"
)
// Time period for checking the schedule of every worker.
const checkInterval = 10 * time.Second
// SleepScheduler manages wake/sleep cycles of Workers.
type SleepScheduler struct {
clock clock.Clock
persist PersistenceService
broadcaster ChangeBroadcaster
}
// New creates a new SleepScheduler.
func New(clock clock.Clock, persist PersistenceService, broadcaster ChangeBroadcaster) *SleepScheduler {
return &SleepScheduler{
clock: clock,
persist: persist,
broadcaster: broadcaster,
}
}
// Run occasionally checks the sleep schedule and updates workers.
// It stops running when the context closes.
func (ss *SleepScheduler) Run(ctx context.Context) {
log.Info().
Str("checkInterval", checkInterval.String()).
Msg("sleep scheduler starting")
defer log.Info().Msg("sleep scheduler shutting down")
for {
select {
case <-ctx.Done():
return
case <-time.After(checkInterval):
ss.CheckSchedules(ctx)
}
}
}
func (ss *SleepScheduler) FetchSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error) {
return ss.persist.FetchWorkerSleepSchedule(ctx, workerUUID)
}
// SetSleepSchedule stores the given schedule as the worker's new sleep schedule.
// The new schedule is immediately applied to the Worker.
func (ss *SleepScheduler) SetSchedule(ctx context.Context, workerUUID string, schedule *persistence.SleepSchedule) error {
// Ensure 'start' actually preceeds 'end'.
if schedule.StartTime.HasValue() &&
schedule.EndTime.HasValue() &&
schedule.EndTime.IsBefore(schedule.StartTime) {
schedule.StartTime, schedule.EndTime = schedule.EndTime, schedule.StartTime
}
schedule.DaysOfWeek = cleanupDaysOfWeek(schedule.DaysOfWeek)
schedule.NextCheck = ss.calculateNextCheck(schedule)
if err := ss.persist.SetWorkerSleepSchedule(ctx, workerUUID, schedule); err != nil {
return fmt.Errorf("persisting sleep schedule of worker %s: %w", workerUUID, err)
}
return ss.ApplySleepSchedule(ctx, schedule)
}
// scheduledWorkerStatus returns the expected worker status for the current date/time.
func (ss *SleepScheduler) scheduledWorkerStatus(sched *persistence.SleepSchedule) api.WorkerStatus {
now := ss.clock.Now()
return scheduledWorkerStatus(now, sched)
}
// Return a timestamp when the next scheck for this schedule is due.
func (ss *SleepScheduler) calculateNextCheck(schedule *persistence.SleepSchedule) time.Time {
now := ss.clock.Now()
return calculateNextCheck(now, schedule)
}
// ApplySleepSchedule sets worker.StatusRequested if the scheduler demands a status change.
func (ss *SleepScheduler) ApplySleepSchedule(ctx context.Context, schedule *persistence.SleepSchedule) error {
// Find the Worker managed by this schedule.
worker := schedule.Worker
if worker == nil {
err := ss.persist.FetchSleepScheduleWorker(ctx, schedule)
if err != nil {
return err
}
worker = schedule.Worker
}
scheduled := ss.scheduledWorkerStatus(schedule)
if scheduled == "" ||
(worker.StatusRequested == scheduled && !worker.LazyStatusRequest) ||
(worker.Status == scheduled && worker.StatusRequested == "") {
// The worker is already in the right state, or is non-lazily requested to
// go to the right state, so nothing else has to be done.
return nil
}
logger := log.With().
Str("worker", worker.Identifier()).
Str("currentStatus", string(worker.Status)).
Str("scheduledStatus", string(scheduled)).
Logger()
if worker.StatusRequested != "" {
logger.Info().Str("oldStatusRequested", string(worker.StatusRequested)).
Msg("sleep scheduler: overruling previously requested status with scheduled status")
} else {
logger.Info().Msg("sleep scheduler: requesting worker to switch to scheduled status")
}
if err := ss.updateWorkerStatus(ctx, worker, scheduled); err != nil {
return err
}
return nil
}
func (ss *SleepScheduler) updateWorkerStatus(
ctx context.Context,
worker *persistence.Worker,
newStatus api.WorkerStatus,
) error {
// Sleep schedule should be adhered to immediately, no lazy requests.
// A render task can run for hours, so better to not wait for it.
worker.StatusChangeRequest(newStatus, false)
err := ss.persist.SaveWorkerStatus(ctx, worker)
if err != nil {
return fmt.Errorf("error saving worker %s to database: %w", worker.Identifier(), err)
}
// Broadcast worker change via SocketIO
ss.broadcaster.BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
Id: worker.UUID,
Name: worker.Name,
Status: worker.Status,
StatusChange: &api.WorkerStatusChangeRequest{
IsLazy: false,
Status: worker.StatusRequested,
},
Updated: worker.UpdatedAt,
Version: worker.Software,
})
return nil
}
// CheckSchedules updates the status of all workers for which a schedule is active.
func (ss *SleepScheduler) CheckSchedules(ctx context.Context) {
toCheck, err := ss.persist.FetchSleepSchedulesToCheck(ctx)
if err != nil {
log.Error().Err(err).Msg("sleep scheduler: unable to fetch sleep schedules")
return
}
if len(toCheck) == 0 {
log.Debug().Msg("sleep scheduler: no sleep schedules need checking")
return
}
log.Debug().Int("numWorkers", len(toCheck)).Msg("sleep scheduler: checking worker sleep schedules")
for _, schedule := range toCheck {
ss.checkSchedule(ctx, schedule)
}
}
func (ss *SleepScheduler) checkSchedule(ctx context.Context, schedule *persistence.SleepSchedule) {
// Compute the next time to check.
schedule.NextCheck = ss.calculateNextCheck(schedule)
if err := ss.persist.SetWorkerSleepScheduleNextCheck(ctx, schedule); err != nil {
log.Error().
Err(err).
Str("worker", schedule.Worker.Identifier()).
Msg("sleep scheduler: error refreshing worker's sleep schedule")
return
}
// Apply the schedule to the worker.
if err := ss.ApplySleepSchedule(ctx, schedule); err != nil {
log.Error().
Err(err).
Str("worker", schedule.Worker.Identifier()).
Msg("sleep scheduler: error applying worker's sleep schedule")
return
}
}

View File

@ -0,0 +1,269 @@
package sleep_scheduler
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/sleep_scheduler/mocks"
"git.blender.org/flamenco/pkg/api"
)
func TestFetchSchedule(t *testing.T) {
ss, mocks, ctx := testFixtures(t)
workerUUID := "aeb49d8a-6903-41b3-b545-77b7a1c0ca19"
dbSched := persistence.SleepSchedule{}
mocks.persist.EXPECT().FetchWorkerSleepSchedule(ctx, workerUUID).Return(&dbSched, nil)
sched, err := ss.FetchSchedule(ctx, workerUUID)
if assert.NoError(t, err) {
assert.Equal(t, &dbSched, sched)
}
}
func TestSetSchedule(t *testing.T) {
ss, mocks, ctx := testFixtures(t)
workerUUID := "aeb49d8a-6903-41b3-b545-77b7a1c0ca19"
sched := persistence.SleepSchedule{
IsActive: true,
DaysOfWeek: " mo tu we",
StartTime: mkToD(9, 0),
EndTime: mkToD(18, 0),
Worker: &persistence.Worker{
UUID: workerUUID,
Status: api.WorkerStatusAwake,
},
}
expectSavedSchedule := sched
expectSavedSchedule.DaysOfWeek = "mo tu we" // Expect a cleanup
expectNextCheck := mocks.todayAt(18, 0) // "now" is at 11:14:47, expect a check at the end time.
expectSavedSchedule.NextCheck = expectNextCheck
// Expect the new schedule to be saved.
mocks.persist.EXPECT().SetWorkerSleepSchedule(ctx, workerUUID, &expectSavedSchedule)
// Expect the new schedule to be immediately applied to the Worker.
// `TestApplySleepSchedule` checks those values, no need to do that here.
mocks.persist.EXPECT().SaveWorkerStatus(ctx, gomock.Any())
mocks.broadcaster.EXPECT().BroadcastWorkerUpdate(gomock.Any())
err := ss.SetSchedule(ctx, workerUUID, &sched)
assert.NoError(t, err)
}
func TestSetScheduleSwappedStartEnd(t *testing.T) {
ss, mocks, ctx := testFixtures(t)
workerUUID := "aeb49d8a-6903-41b3-b545-77b7a1c0ca19"
sched := persistence.SleepSchedule{
IsActive: true,
DaysOfWeek: "mo tu we",
StartTime: mkToD(18, 0),
EndTime: mkToD(9, 0),
// Worker already in the right state, so no saving/broadcasting expected.
Worker: &persistence.Worker{
UUID: workerUUID,
Status: api.WorkerStatusAsleep,
},
}
expectSavedSchedule := persistence.SleepSchedule{
IsActive: true,
DaysOfWeek: "mo tu we",
StartTime: mkToD(9, 0), // Expect start and end time to be corrected.
EndTime: mkToD(18, 0),
NextCheck: mocks.todayAt(18, 0), // "now" is at 11:14:47, expect a check at the end time.
Worker: sched.Worker,
}
mocks.persist.EXPECT().SetWorkerSleepSchedule(ctx, workerUUID, &expectSavedSchedule)
err := ss.SetSchedule(ctx, workerUUID, &sched)
assert.NoError(t, err)
}
func TestApplySleepSchedule(t *testing.T) {
ss, mocks, ctx := testFixtures(t)
worker := persistence.Worker{
Model: persistence.Model{ID: 5},
UUID: "74997de4-c530-4913-b89f-c489f14f7634",
Status: api.WorkerStatusOffline,
}
sched := persistence.SleepSchedule{
IsActive: true,
DaysOfWeek: "mo tu we",
StartTime: mkToD(9, 0),
EndTime: mkToD(18, 0),
}
testForExpectedStatus := func(expectedNewStatus api.WorkerStatus) {
// Take a copy of the worker & schedule, for test isolation.
testSchedule := sched
testWorker := worker
// Expect the Worker to be fetched.
mocks.persist.EXPECT().FetchSleepScheduleWorker(ctx, &testSchedule).DoAndReturn(
func(ctx context.Context, schedule *persistence.SleepSchedule) error {
schedule.Worker = &testWorker
return nil
})
// Construct the worker as we expect it to be saved to the database.
savedWorker := testWorker
savedWorker.LazyStatusRequest = false
savedWorker.StatusRequested = expectedNewStatus
mocks.persist.EXPECT().SaveWorkerStatus(ctx, &savedWorker)
// Expect SocketIO broadcast.
var sioUpdate api.SocketIOWorkerUpdate
mocks.broadcaster.EXPECT().BroadcastWorkerUpdate(gomock.Any()).DoAndReturn(
func(workerUpdate api.SocketIOWorkerUpdate) {
sioUpdate = workerUpdate
})
// Actually apply the sleep schedule.
err := ss.ApplySleepSchedule(ctx, &testSchedule)
if !assert.NoError(t, err) {
t.FailNow()
}
// Check the SocketIO broadcast.
if sioUpdate.Id != "" {
assert.Equal(t, testWorker.UUID, sioUpdate.Id)
assert.False(t, sioUpdate.StatusChange.IsLazy)
assert.Equal(t, expectedNewStatus, sioUpdate.StatusChange.Status)
}
}
// Move the clock to the middle of the sleep schedule, so worker should sleep.
mocks.clock.Set(mocks.todayAt(10, 47))
testForExpectedStatus(api.WorkerStatusAsleep)
// Move the clock to before the sleep schedule start.
mocks.clock.Set(mocks.todayAt(0, 3))
testForExpectedStatus(api.WorkerStatusAwake)
// Move the clock to after the sleep schedule ends.
mocks.clock.Set(mocks.todayAt(19, 59))
testForExpectedStatus(api.WorkerStatusAwake)
// Test that the worker should sleep, and has already been requested to sleep,
// but lazily. This should trigger a non-lazy status change request.
mocks.clock.Set(mocks.todayAt(10, 47))
worker.Status = api.WorkerStatusAwake
worker.StatusRequested = api.WorkerStatusAsleep
worker.LazyStatusRequest = true
testForExpectedStatus(api.WorkerStatusAsleep)
}
func TestApplySleepScheduleAlreadyCorrectStatus(t *testing.T) {
ss, mocks, ctx := testFixtures(t)
worker := persistence.Worker{
Model: persistence.Model{ID: 5},
UUID: "74997de4-c530-4913-b89f-c489f14f7634",
Status: api.WorkerStatusAsleep,
}
sched := persistence.SleepSchedule{
IsActive: true,
DaysOfWeek: "mo tu we",
StartTime: mkToD(9, 0),
EndTime: mkToD(18, 0),
}
runTest := func() {
// Take a copy of the worker & schedule, for test isolation.
testSchedule := sched
testWorker := worker
// Expect the Worker to be fetched.
mocks.persist.EXPECT().FetchSleepScheduleWorker(ctx, &testSchedule).DoAndReturn(
func(ctx context.Context, schedule *persistence.SleepSchedule) error {
schedule.Worker = &testWorker
return nil
})
// Apply the sleep schedule. This should not trigger any persistence or broadcasts.
err := ss.ApplySleepSchedule(ctx, &testSchedule)
if !assert.NoError(t, err) {
t.FailNow()
}
}
// Move the clock to the middle of the sleep schedule, so the schedule always
// wants the worker to sleep.
mocks.clock.Set(mocks.todayAt(10, 47))
// Current status is already good.
worker.Status = api.WorkerStatusAsleep
runTest()
// Current status is not the right one, but the requested status is already good.
worker.Status = api.WorkerStatusAwake
worker.StatusRequested = api.WorkerStatusAsleep
worker.LazyStatusRequest = false
runTest()
}
type TestMocks struct {
clock *clock.Mock
persist *mocks.MockPersistenceService
broadcaster *mocks.MockChangeBroadcaster
}
// todayAt returns whatever the mocked clock's "now" is set to, with the time set
// to the given time. Seconds and sub-seconds are set to zero.
func (m *TestMocks) todayAt(hour, minute int) time.Time {
now := m.clock.Now()
return time.Date(now.Year(), now.Month(), now.Day(), hour, minute, 0, 0, now.Location())
}
// endOfDay returns midnight of the day after whatever the mocked clock's "now" is set to.
func (m *TestMocks) endOfDay() time.Time {
now := m.clock.Now()
return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).AddDate(0, 0, 1)
}
func testFixtures(t *testing.T) (*SleepScheduler, TestMocks, context.Context) {
ctx := context.Background()
mockedClock := clock.NewMock()
mockedNow, err := time.Parse(time.RFC3339, "2022-06-07T11:14:47+02:00")
if err != nil {
panic(err)
}
mockedClock.Set(mockedNow)
if !assert.Equal(t, time.Tuesday.String(), mockedNow.Weekday().String()) {
t.Fatal("tests assume 'now' is a Tuesday")
}
mockCtrl := gomock.NewController(t)
mocks := TestMocks{
clock: mockedClock,
persist: mocks.NewMockPersistenceService(mockCtrl),
broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl),
}
ss := New(mocks.clock, mocks.persist, mocks.broadcaster)
return ss, mocks, ctx
}
func mkToD(hour, minute int) persistence.TimeOfDay {
return persistence.TimeOfDay{Hour: hour, Minute: minute}
}