From d7b164133aeaecd889a155b59b09bcbb35777400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Sun, 17 Jul 2022 17:27:32 +0200 Subject: [PATCH] Sleep Scheduler implementation for the Manager The Manager now has a sleep scheduler for Workers. The API and background service work, but there is no web interface yet. Manifest Task: T99397 --- cmd/flamenco-manager/main.go | 15 +- internal/manager/api_impl/api_impl.go | 43 +-- internal/manager/api_impl/interfaces.go | 13 +- .../api_impl/mocks/api_impl_mock.gen.go | 83 ++++-- internal/manager/api_impl/support_test.go | 47 +-- .../manager/api_impl/worker_sleep_schedule.go | 20 +- internal/manager/persistence/time_of_day.go | 109 +++++++ .../manager/persistence/time_of_day_test.go | 129 +++++++++ .../persistence/worker_sleep_schedule.go | 49 +++- .../persistence/worker_sleep_schedule_test.go | 251 ++++++++++++++-- internal/manager/persistence/workers.go | 7 +- .../manager/sleep_scheduler/calculations.go | 93 ++++++ .../sleep_scheduler/calculations_test.go | 114 ++++++++ .../manager/sleep_scheduler/interfaces.go | 37 +++ .../mocks/interfaces_mock.gen.go | 158 ++++++++++ .../sleep_scheduler/sleep_scheduler.go | 198 +++++++++++++ .../sleep_scheduler/sleep_scheduler_test.go | 269 ++++++++++++++++++ 17 files changed, 1520 insertions(+), 115 deletions(-) create mode 100644 internal/manager/persistence/time_of_day.go create mode 100644 internal/manager/persistence/time_of_day_test.go create mode 100644 internal/manager/sleep_scheduler/calculations.go create mode 100644 internal/manager/sleep_scheduler/calculations_test.go create mode 100644 internal/manager/sleep_scheduler/interfaces.go create mode 100644 internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go create mode 100644 internal/manager/sleep_scheduler/sleep_scheduler.go create mode 100644 internal/manager/sleep_scheduler/sleep_scheduler_test.go diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 5ba16303..fc23402b 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -40,6 +40,7 @@ import ( "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/local_storage" "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/sleep_scheduler" "git.blender.org/flamenco/internal/manager/swagger_ui" "git.blender.org/flamenco/internal/manager/task_logs" "git.blender.org/flamenco/internal/manager/task_state_machine" @@ -159,11 +160,12 @@ func runFlamencoManager() bool { logStorage := task_logs.NewStorage(localStorage, timeService, webUpdater) taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage) + sleepScheduler := sleep_scheduler.New(timeService, persist, webUpdater) lastRender := last_rendered.New(localStorage) shamanServer := buildShamanServer(configService, isFirstRun) flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, - shamanServer, logStorage, webUpdater, lastRender, localStorage) + shamanServer, logStorage, webUpdater, lastRender, localStorage, sleepScheduler) e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage) timeoutChecker := timeout_checker.New( @@ -223,6 +225,13 @@ func runFlamencoManager() bool { timeoutChecker.Run(mainCtx) }() + // Run the Worker sleep scheduler. + wg.Add(1) + go func() { + defer wg.Done() + sleepScheduler.Run(mainCtx) + }() + // Open a webbrowser, but give the web service some time to start first. if isFirstRun { go openWebbrowser(mainCtx, urls[0]) @@ -250,6 +259,7 @@ func buildFlamencoAPI( webUpdater *webupdates.BiDirComms, lastRender *last_rendered.LastRenderedProcessor, localStorage local_storage.StorageInfo, + sleepScheduler *sleep_scheduler.SleepScheduler, ) *api_impl.Flamenco { compiler, err := job_compilers.Load(timeService) if err != nil { @@ -258,7 +268,7 @@ func buildFlamencoAPI( flamenco := api_impl.NewFlamenco( compiler, persist, webUpdater, logStorage, configService, taskStateMachine, shamanServer, timeService, lastRender, - localStorage) + localStorage, sleepScheduler) return flamenco } @@ -272,6 +282,7 @@ func buildWebService( ) *echo.Echo { e := echo.New() e.HideBanner = true + e.HidePort = true // The request should come in fairly quickly, given that Flamenco is intended // to run on a local network. diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index 82daa98f..af2a16e0 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -16,16 +16,17 @@ import ( ) type Flamenco struct { - jobCompiler JobCompiler - persist PersistenceService - broadcaster ChangeBroadcaster - logStorage LogStorage - config ConfigService - stateMachine TaskStateMachine - shaman Shaman - clock TimeService - lastRender LastRendered - localStorage LocalStorage + jobCompiler JobCompiler + persist PersistenceService + broadcaster ChangeBroadcaster + logStorage LogStorage + config ConfigService + stateMachine TaskStateMachine + shaman Shaman + clock TimeService + lastRender LastRendered + localStorage LocalStorage + sleepScheduler WorkerSleepScheduler // The task scheduler can be locked to prevent multiple Workers from getting // the same task. It is also used for certain other queries, like @@ -51,18 +52,20 @@ func NewFlamenco( ts TimeService, lr LastRendered, localStorage LocalStorage, + wss WorkerSleepScheduler, ) *Flamenco { return &Flamenco{ - jobCompiler: jc, - persist: jps, - broadcaster: b, - logStorage: logStorage, - config: cs, - stateMachine: sm, - shaman: sha, - clock: ts, - lastRender: lr, - localStorage: localStorage, + jobCompiler: jc, + persist: jps, + broadcaster: b, + logStorage: logStorage, + config: cs, + stateMachine: sm, + shaman: sha, + clock: ts, + lastRender: lr, + localStorage: localStorage, + sleepScheduler: wss, done: make(chan struct{}), } diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index c957ed74..a8573297 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -17,6 +17,7 @@ import ( "git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/internal/manager/last_rendered" "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/sleep_scheduler" "git.blender.org/flamenco/internal/manager/task_state_machine" "git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/pkg/api" @@ -24,7 +25,7 @@ import ( ) // Generate mock implementations of these interfaces. -//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage +//go:generate go run github.com/golang/mock/mockgen -destination mocks/api_impl_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/api_impl PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler type PersistenceService interface { StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error @@ -65,9 +66,6 @@ type PersistenceService interface { // CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type. CountTaskFailuresOfWorker(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) (int, error) - FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error) - SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule persistence.SleepSchedule) error - // Database queries. QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error) @@ -206,3 +204,10 @@ type TimeService interface { } var _ TimeService = (clock.Clock)(nil) + +type WorkerSleepScheduler interface { + FetchSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error) + SetSchedule(ctx context.Context, workerUUID string, schedule *persistence.SleepSchedule) error +} + +var _ WorkerSleepScheduler = (*sleep_scheduler.SleepScheduler)(nil) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index f24d6b4b..23b905ec 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage) +// Source: git.blender.org/flamenco/internal/manager/api_impl (interfaces: PersistenceService,ChangeBroadcaster,JobCompiler,LogStorage,ConfigService,TaskStateMachine,Shaman,LastRendered,LocalStorage,WorkerSleepScheduler) // Package mocks is a generated GoMock package. package mocks @@ -216,21 +216,6 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) } -// FetchWorkerSleepSchedule mocks base method. -func (m *MockPersistenceService) FetchWorkerSleepSchedule(arg0 context.Context, arg1 string) (*persistence.SleepSchedule, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWorkerSleepSchedule", arg0, arg1) - ret0, _ := ret[0].(*persistence.SleepSchedule) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// FetchWorkerSleepSchedule indicates an expected call of FetchWorkerSleepSchedule. -func (mr *MockPersistenceServiceMockRecorder) FetchWorkerSleepSchedule(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerSleepSchedule), arg0, arg1) -} - // FetchWorkers mocks base method. func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { m.ctrl.T.Helper() @@ -390,20 +375,6 @@ func (mr *MockPersistenceServiceMockRecorder) SetLastRendered(arg0, arg1 interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastRendered", reflect.TypeOf((*MockPersistenceService)(nil).SetLastRendered), arg0, arg1) } -// SetWorkerSleepSchedule mocks base method. -func (m *MockPersistenceService) SetWorkerSleepSchedule(arg0 context.Context, arg1 string, arg2 persistence.SleepSchedule) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetWorkerSleepSchedule", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 -} - -// SetWorkerSleepSchedule indicates an expected call of SetWorkerSleepSchedule. -func (mr *MockPersistenceServiceMockRecorder) SetWorkerSleepSchedule(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).SetWorkerSleepSchedule), arg0, arg1, arg2) -} - // StoreAuthoredJob mocks base method. func (m *MockPersistenceService) StoreAuthoredJob(arg0 context.Context, arg1 job_compilers.AuthoredJob) error { m.ctrl.T.Helper() @@ -1116,3 +1087,55 @@ func (mr *MockLocalStorageMockRecorder) RelPath(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RelPath", reflect.TypeOf((*MockLocalStorage)(nil).RelPath), arg0) } + +// MockWorkerSleepScheduler is a mock of WorkerSleepScheduler interface. +type MockWorkerSleepScheduler struct { + ctrl *gomock.Controller + recorder *MockWorkerSleepSchedulerMockRecorder +} + +// MockWorkerSleepSchedulerMockRecorder is the mock recorder for MockWorkerSleepScheduler. +type MockWorkerSleepSchedulerMockRecorder struct { + mock *MockWorkerSleepScheduler +} + +// NewMockWorkerSleepScheduler creates a new mock instance. +func NewMockWorkerSleepScheduler(ctrl *gomock.Controller) *MockWorkerSleepScheduler { + mock := &MockWorkerSleepScheduler{ctrl: ctrl} + mock.recorder = &MockWorkerSleepSchedulerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockWorkerSleepScheduler) EXPECT() *MockWorkerSleepSchedulerMockRecorder { + return m.recorder +} + +// FetchSchedule mocks base method. +func (m *MockWorkerSleepScheduler) FetchSchedule(arg0 context.Context, arg1 string) (*persistence.SleepSchedule, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchSchedule", arg0, arg1) + ret0, _ := ret[0].(*persistence.SleepSchedule) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchSchedule indicates an expected call of FetchSchedule. +func (mr *MockWorkerSleepSchedulerMockRecorder) FetchSchedule(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSchedule", reflect.TypeOf((*MockWorkerSleepScheduler)(nil).FetchSchedule), arg0, arg1) +} + +// SetSchedule mocks base method. +func (m *MockWorkerSleepScheduler) SetSchedule(arg0 context.Context, arg1 string, arg2 *persistence.SleepSchedule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetSchedule", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetSchedule indicates an expected call of SetSchedule. +func (mr *MockWorkerSleepSchedulerMockRecorder) SetSchedule(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSchedule", reflect.TypeOf((*MockWorkerSleepScheduler)(nil).SetSchedule), arg0, arg1, arg2) +} diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index 3ae8618f..bdd85ecf 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -23,17 +23,18 @@ import ( ) type mockedFlamenco struct { - flamenco *Flamenco - jobCompiler *mocks.MockJobCompiler - persistence *mocks.MockPersistenceService - broadcaster *mocks.MockChangeBroadcaster - logStorage *mocks.MockLogStorage - config *mocks.MockConfigService - stateMachine *mocks.MockTaskStateMachine - shaman *mocks.MockShaman - clock *clock.Mock - lastRender *mocks.MockLastRendered - localStorage *mocks.MockLocalStorage + flamenco *Flamenco + jobCompiler *mocks.MockJobCompiler + persistence *mocks.MockPersistenceService + broadcaster *mocks.MockChangeBroadcaster + logStorage *mocks.MockLogStorage + config *mocks.MockConfigService + stateMachine *mocks.MockTaskStateMachine + shaman *mocks.MockShaman + clock *clock.Mock + lastRender *mocks.MockLastRendered + localStorage *mocks.MockLocalStorage + sleepScheduler *mocks.MockWorkerSleepScheduler // Place for some tests to store a temporary directory. tempdir string @@ -49,6 +50,7 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { sha := mocks.NewMockShaman(mockCtrl) lr := mocks.NewMockLastRendered(mockCtrl) localStore := mocks.NewMockLocalStorage(mockCtrl) + wss := mocks.NewMockWorkerSleepScheduler(mockCtrl) clock := clock.NewMock() mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T11:14:41+02:00") @@ -57,19 +59,20 @@ func newMockedFlamenco(mockCtrl *gomock.Controller) mockedFlamenco { } clock.Set(mockedNow) - f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore) + f := NewFlamenco(jc, ps, cb, logStore, cs, sm, sha, clock, lr, localStore, wss) return mockedFlamenco{ - flamenco: f, - jobCompiler: jc, - persistence: ps, - broadcaster: cb, - logStorage: logStore, - config: cs, - stateMachine: sm, - clock: clock, - lastRender: lr, - localStorage: localStore, + flamenco: f, + jobCompiler: jc, + persistence: ps, + broadcaster: cb, + logStorage: logStore, + config: cs, + stateMachine: sm, + clock: clock, + lastRender: lr, + localStorage: localStore, + sleepScheduler: wss, } } diff --git a/internal/manager/api_impl/worker_sleep_schedule.go b/internal/manager/api_impl/worker_sleep_schedule.go index 1f34b3d4..c6adb280 100644 --- a/internal/manager/api_impl/worker_sleep_schedule.go +++ b/internal/manager/api_impl/worker_sleep_schedule.go @@ -18,7 +18,7 @@ func (f *Flamenco) FetchWorkerSleepSchedule(e echo.Context, workerUUID string) e ctx := e.Request().Context() logger := requestLogger(e) logger = logger.With().Str("worker", workerUUID).Logger() - schedule, err := f.persist.FetchWorkerSleepSchedule(ctx, workerUUID) + schedule, err := f.sleepScheduler.FetchSchedule(ctx, workerUUID) switch { case errors.Is(err, persistence.ErrWorkerNotFound): @@ -33,9 +33,9 @@ func (f *Flamenco) FetchWorkerSleepSchedule(e echo.Context, workerUUID string) e apiSchedule := api.WorkerSleepSchedule{ DaysOfWeek: schedule.DaysOfWeek, - EndTime: schedule.EndTime, + EndTime: schedule.EndTime.String(), IsActive: schedule.IsActive, - StartTime: schedule.StartTime, + StartTime: schedule.StartTime.String(), } return e.JSON(http.StatusOK, apiSchedule) } @@ -57,14 +57,22 @@ func (f *Flamenco) SetWorkerSleepSchedule(e echo.Context, workerUUID string) err } schedule := api.WorkerSleepSchedule(req) + // Create a sleep schedule that can be persisted. dbSchedule := persistence.SleepSchedule{ IsActive: schedule.IsActive, DaysOfWeek: schedule.DaysOfWeek, - StartTime: schedule.StartTime, - EndTime: schedule.EndTime, + } + if err := dbSchedule.StartTime.Scan(schedule.StartTime); err != nil { + logger.Warn().Err(err).Msg("bad request received, cannot parse schedule start time") + return sendAPIError(e, http.StatusBadRequest, "invalid format for schedule start time") + } + if err := dbSchedule.EndTime.Scan(schedule.EndTime); err != nil { + logger.Warn().Err(err).Msg("bad request received, cannot parse schedule end time") + return sendAPIError(e, http.StatusBadRequest, "invalid format for schedule end time") } - err = f.persist.SetWorkerSleepSchedule(ctx, workerUUID, dbSchedule) + // Send the sleep schedule to the scheduler. + err = f.sleepScheduler.SetSchedule(ctx, workerUUID, &dbSchedule) switch { case errors.Is(err, persistence.ErrWorkerNotFound): logger.Warn().Msg("SetWorkerSleepSchedule: worker does not exist") diff --git a/internal/manager/persistence/time_of_day.go b/internal/manager/persistence/time_of_day.go new file mode 100644 index 00000000..595bc26d --- /dev/null +++ b/internal/manager/persistence/time_of_day.go @@ -0,0 +1,109 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "database/sql/driver" + "fmt" + "time" +) + +const ( + timeOfDayStringFormat = "%02d:%02d" + + // Assigned to the Hour and Minute fields to indicate "no value". + timeOfDayNoValue = -1 +) + +// TimeOfDay represents a time of day, and can be converted to/from a string. +// Its date and timezone components are ignored, and the time is supposed to be +// interpreted as local time on any date (f.e. a scheduled sleep time of some +// Worker on a certain day-of-week & local timezone). +// +// TimeOfDay structs can also represent "no value", which will be marshaled as +// an empty string. +type TimeOfDay struct { + Hour int + Minute int +} + +// MakeTimeOfDay converts a time.Time into a TimeOfDay. +func MakeTimeOfDay(someTime time.Time) TimeOfDay { + return TimeOfDay{someTime.Hour(), someTime.Minute()} +} + +// EmptyTimeOfDay returns a TimeOfDay struct with no value. +// See `TimeOfDay.HasValue()`. +func EmptyTimeOfDay() TimeOfDay { + return TimeOfDay{Hour: timeOfDayNoValue, Minute: timeOfDayNoValue} +} + +// Value converts a TimeOfDay to a value usable by SQL databases. +func (ot TimeOfDay) Value() (driver.Value, error) { + var asString = ot.String() + return asString, nil +} + +// Scan updates this TimeOfDay from the value stored in a database. +func (ot *TimeOfDay) Scan(value interface{}) error { + b, ok := value.(string) + if !ok { + return fmt.Errorf("expected string, received %T", value) + } + return ot.setString(string(b)) +} + +// Equals returns True iff both times represent the same time of day. +func (ot TimeOfDay) Equals(other TimeOfDay) bool { + return ot.Hour == other.Hour && ot.Minute == other.Minute +} + +// IsBefore returns True iff ot is before other. +// Ignores everything except hour and minute fields. +func (ot TimeOfDay) IsBefore(other TimeOfDay) bool { + if ot.Hour != other.Hour { + return ot.Hour < other.Hour + } + return ot.Minute < other.Minute +} + +// IsAfter returns True iff ot is after other. +// Ignores everything except hour and minute fields. +func (ot TimeOfDay) IsAfter(other TimeOfDay) bool { + if ot.Hour != other.Hour { + return ot.Hour > other.Hour + } + return ot.Minute > other.Minute +} + +// OnDate returns the time of day in the local timezone on the given date. +func (ot TimeOfDay) OnDate(date time.Time) time.Time { + year, month, day := date.Date() + return time.Date(year, month, day, ot.Hour, ot.Minute, 0, 0, time.Local) +} + +func (ot TimeOfDay) String() string { + if !ot.HasValue() { + return "" + } + return fmt.Sprintf(timeOfDayStringFormat, ot.Hour, ot.Minute) +} + +func (ot TimeOfDay) HasValue() bool { + return ot.Hour != timeOfDayNoValue && ot.Minute != timeOfDayNoValue +} + +func (ot *TimeOfDay) setString(value string) error { + scanned := TimeOfDay{} + if value == "" { + *ot = TimeOfDay{timeOfDayNoValue, timeOfDayNoValue} + return nil + } + + _, err := fmt.Sscanf(value, timeOfDayStringFormat, &scanned.Hour, &scanned.Minute) + if err != nil { + return err + } + *ot = scanned + return nil +} diff --git a/internal/manager/persistence/time_of_day_test.go b/internal/manager/persistence/time_of_day_test.go new file mode 100644 index 00000000..01a46876 --- /dev/null +++ b/internal/manager/persistence/time_of_day_test.go @@ -0,0 +1,129 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var emptyToD = TimeOfDay{timeOfDayNoValue, timeOfDayNoValue} + +func TestIsBefore(t *testing.T) { + test := func(expect bool, hour1, min1, hour2, min2 int) { + time1 := TimeOfDay{hour1, min1} + time2 := TimeOfDay{hour2, min2} + + assert.Equal(t, expect, time1.IsBefore(time2)) + } + test(false, 0, 0, 0, 0) + test(true, 0, 0, 0, 1) + test(true, 1, 59, 2, 0) + test(true, 1, 2, 1, 3) + test(true, 1, 2, 15, 1) + test(false, 17, 0, 8, 0) +} + +func TestIsAfter(t *testing.T) { + test := func(expect bool, hour1, min1, hour2, min2 int) { + time1 := TimeOfDay{hour1, min1} + time2 := TimeOfDay{hour2, min2} + + assert.Equal(t, expect, time1.IsAfter(time2)) + } + test(false, 0, 0, 0, 0) + test(true, 0, 1, 0, 0) + test(true, 2, 1, 1, 59) + test(true, 1, 3, 1, 2) + test(true, 15, 1, 1, 2) + test(false, 8, 0, 17, 0) +} + +func TestOnDate(t *testing.T) { + theDate := time.Date(2018, 12, 13, 7, 59, 43, 123, time.Local) + tod := TimeOfDay{16, 47} + expect := time.Date(2018, 12, 13, 16, 47, 0, 0, time.Local) + assert.Equal(t, expect, tod.OnDate(theDate)) + + // Midnight on the same day. + tod = TimeOfDay{0, 0} + expect = time.Date(2018, 12, 13, 0, 0, 0, 0, time.Local) + assert.Equal(t, expect, tod.OnDate(theDate)) + + // Midnight a day later. + tod = TimeOfDay{24, 0} + expect = time.Date(2018, 12, 14, 0, 0, 0, 0, time.Local) + assert.Equal(t, expect, tod.OnDate(theDate)) + +} + +func TestValue(t *testing.T) { + // Test zero -> "00:00" + tod := TimeOfDay{} + if value, err := tod.Value(); assert.NoError(t, err) { + assert.Equal(t, "00:00", value) + } + + // Test 22:47 -> "22:47" + tod = TimeOfDay{22, 47} + if value, err := tod.Value(); assert.NoError(t, err) { + assert.Equal(t, "22:47", value) + } + + // Test empty -> "" + tod = emptyToD + if value, err := tod.Value(); assert.NoError(t, err) { + assert.Equal(t, "", value) + } +} + +func TestScan(t *testing.T) { + // Test zero -> empty + tod := TimeOfDay{} + if assert.NoError(t, tod.Scan("")) { + assert.Equal(t, emptyToD, tod) + } + + // Test 22:47 -> empty + tod = TimeOfDay{22, 47} + if assert.NoError(t, tod.Scan("")) { + assert.Equal(t, emptyToD, tod) + } + + // Test 22:47 -> 12:34 + tod = TimeOfDay{22, 47} + if assert.NoError(t, tod.Scan("12:34")) { + assert.Equal(t, TimeOfDay{12, 34}, tod) + } + + // Test empty -> empty + tod = emptyToD + if assert.NoError(t, tod.Scan("")) { + assert.Equal(t, emptyToD, tod) + } + + // Test empty -> 12:34 + tod = emptyToD + if assert.NoError(t, tod.Scan("12:34")) { + assert.Equal(t, TimeOfDay{12, 34}, tod) + } +} + +func TestHasValue(t *testing.T) { + zeroTod := TimeOfDay{} + assert.True(t, zeroTod.HasValue(), "zero value should be midnight, and thus be a valid value") + + fullToD := TimeOfDay{22, 47} + assert.True(t, fullToD.HasValue()) + + noValueToD := TimeOfDay{timeOfDayNoValue, timeOfDayNoValue} + assert.False(t, noValueToD.HasValue()) + + onlyMinuteValue := TimeOfDay{timeOfDayNoValue, 47} + assert.False(t, onlyMinuteValue.HasValue()) + + onlyHourValue := TimeOfDay{22, timeOfDayNoValue} + assert.False(t, onlyHourValue.HasValue()) +} diff --git a/internal/manager/persistence/worker_sleep_schedule.go b/internal/manager/persistence/worker_sleep_schedule.go index d3174fa3..db497a79 100644 --- a/internal/manager/persistence/worker_sleep_schedule.go +++ b/internal/manager/persistence/worker_sleep_schedule.go @@ -23,13 +23,16 @@ type SleepSchedule struct { // Space-separated two-letter strings indicating days of week the schedule is // active ("mo", "tu", etc.). Empty means "every day". - DaysOfWeek string `gorm:"default:''"` - StartTime string `gorm:"default:''"` - EndTime string `gorm:"default:''"` + DaysOfWeek string `gorm:"default:''"` + StartTime TimeOfDay `gorm:"default:''"` + EndTime TimeOfDay `gorm:"default:''"` - NextCheck *time.Time + NextCheck time.Time } +// FetchWorkerSleepSchedule fetches the worker's sleep schedule. +// It does not fetch the worker itself. If you need that, call +// `FetchSleepScheduleWorker()` afterwards. func (db *DB) FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*SleepSchedule, error) { logger := log.With().Str("worker", workerUUID).Logger() logger.Trace().Msg("fetching worker sleep schedule") @@ -49,7 +52,7 @@ func (db *DB) FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) ( return &sched, nil } -func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule SleepSchedule) error { +func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule *SleepSchedule) error { logger := log.With().Str("worker", workerUUID).Logger() logger.Trace().Msg("setting worker sleep schedule") @@ -68,3 +71,39 @@ func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, sch Create(&schedule) return tx.Error } + +func (db *DB) SetWorkerSleepScheduleNextCheck(ctx context.Context, schedule *SleepSchedule) error { + tx := db.gormDB.WithContext(ctx). + Select("next_check"). + Updates(schedule) + return tx.Error +} + +// FetchSleepScheduleWorker sets the given schedule's `Worker` pointer. +func (db *DB) FetchSleepScheduleWorker(ctx context.Context, schedule *SleepSchedule) error { + var worker Worker + tx := db.gormDB.WithContext(ctx).First(&worker, schedule.WorkerID) + if tx.Error != nil { + return workerError(tx.Error, "finding worker by their sleep schedule") + } + schedule.Worker = &worker + return nil +} + +// FetchSleepSchedulesToCheck returns the sleep schedules that are due for a check. +func (db *DB) FetchSleepSchedulesToCheck(ctx context.Context) ([]*SleepSchedule, error) { + log.Trace().Msg("fetching sleep schedules that need checking") + + now := db.gormDB.NowFunc() + + schedules := []*SleepSchedule{} + tx := db.gormDB.WithContext(ctx). + Model(&SleepSchedule{}). + Where("is_active = ?", true). + Where("next_check <= ? or next_check is NULL or next_check = ''", now). + Scan(&schedules) + if tx.Error != nil { + return nil, tx.Error + } + return schedules, nil +} diff --git a/internal/manager/persistence/worker_sleep_schedule_test.go b/internal/manager/persistence/worker_sleep_schedule_test.go index d4180d66..c6e85c02 100644 --- a/internal/manager/persistence/worker_sleep_schedule_test.go +++ b/internal/manager/persistence/worker_sleep_schedule_test.go @@ -44,8 +44,8 @@ func TestFetchWorkerSleepSchedule(t *testing.T) { IsActive: true, DaysOfWeek: "mo,tu,th,fr", - StartTime: "18:00", - EndTime: "09:00", + StartTime: TimeOfDay{18, 0}, + EndTime: TimeOfDay{9, 0}, } tx := db.gormDB.Create(&created) if !assert.NoError(t, tx.Error) { @@ -57,6 +57,52 @@ func TestFetchWorkerSleepSchedule(t *testing.T) { assertEqualSleepSchedule(t, linuxWorker.ID, created, *fetched) } +func TestFetchSleepScheduleWorker(t *testing.T) { + ctx, finish, db := persistenceTestFixtures(t, 1*time.Second) + defer finish() + + linuxWorker := Worker{ + UUID: uuid.New(), + Name: "дрон", + Address: "fe80::5054:ff:fede:2ad7", + Platform: "linux", + Software: "3.0", + Status: api.WorkerStatusAwake, + SupportedTaskTypes: "blender,ffmpeg,file-management", + } + err := db.CreateWorker(ctx, &linuxWorker) + if !assert.NoError(t, err) { + t.FailNow() + } + + // Create a sleep schedule. + created := SleepSchedule{ + WorkerID: linuxWorker.ID, + Worker: &linuxWorker, + + IsActive: true, + DaysOfWeek: "mo,tu,th,fr", + StartTime: TimeOfDay{18, 0}, + EndTime: TimeOfDay{9, 0}, + } + tx := db.gormDB.Create(&created) + if !assert.NoError(t, tx.Error) { + t.FailNow() + } + + dbSchedule, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) + assert.NoError(t, err) + assert.Nil(t, dbSchedule.Worker, "worker should be nil when fetching schedule") + + err = db.FetchSleepScheduleWorker(ctx, dbSchedule) + assert.NoError(t, err) + if assert.NotNil(t, dbSchedule.Worker) { + // Compare a few fields. If these are good, the correct worker has been fetched. + assert.Equal(t, linuxWorker.ID, dbSchedule.Worker.ID) + assert.Equal(t, linuxWorker.UUID, dbSchedule.Worker.UUID) + } +} + func TestSetWorkerSleepSchedule(t *testing.T) { ctx, finish, db := persistenceTestFixtures(t, 1*time.Second) defer finish() @@ -81,31 +127,39 @@ func TestSetWorkerSleepSchedule(t *testing.T) { IsActive: true, DaysOfWeek: "mo,tu,th,fr", - StartTime: "18:00", - EndTime: "09:00", + StartTime: TimeOfDay{18, 0}, + EndTime: TimeOfDay{9, 0}, } // Not an existing Worker. - err = db.SetWorkerSleepSchedule(ctx, "2cf6153a-3d4e-49f4-a5c0-1c9fc176e155", schedule) + err = db.SetWorkerSleepSchedule(ctx, "2cf6153a-3d4e-49f4-a5c0-1c9fc176e155", &schedule) assert.ErrorIs(t, err, ErrWorkerNotFound) // Create the sleep schedule. - err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, schedule) - assert.NoError(t, err) + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &schedule) + if !assert.NoError(t, err) { + t.FailNow() + } fetched, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) - assert.NoError(t, err) + if !assert.NoError(t, err) { + t.FailNow() + } assertEqualSleepSchedule(t, linuxWorker.ID, schedule, *fetched) // Overwrite the schedule with one that already has a database ID. newSchedule := schedule newSchedule.IsActive = false newSchedule.DaysOfWeek = "mo,tu,we,th,fr" - newSchedule.StartTime = "02:00" - newSchedule.EndTime = "06:00" - err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, newSchedule) - assert.NoError(t, err) + newSchedule.StartTime = TimeOfDay{2, 0} + newSchedule.EndTime = TimeOfDay{6, 0} + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &newSchedule) + if !assert.NoError(t, err) { + t.FailNow() + } fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) - assert.NoError(t, err) + if !assert.NoError(t, err) { + t.FailNow() + } assertEqualSleepSchedule(t, linuxWorker.ID, newSchedule, *fetched) // Overwrite the schedule with a freshly constructed one. @@ -115,21 +169,172 @@ func TestSetWorkerSleepSchedule(t *testing.T) { IsActive: true, DaysOfWeek: "mo", - StartTime: "03:27", - EndTime: "15:47", + StartTime: TimeOfDay{3, 0}, + EndTime: TimeOfDay{15, 0}, + } + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &newerSchedule) + if !assert.NoError(t, err) { + t.FailNow() } - err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, newerSchedule) - assert.NoError(t, err) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) - assert.NoError(t, err) + if !assert.NoError(t, err) { + t.FailNow() + } assertEqualSleepSchedule(t, linuxWorker.ID, newerSchedule, *fetched) + + // Clear the sleep schedule. + emptySchedule := SleepSchedule{ + WorkerID: linuxWorker.ID, + Worker: &linuxWorker, + + IsActive: false, + DaysOfWeek: "", + StartTime: emptyToD, + EndTime: emptyToD, + } + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, &emptySchedule) + if !assert.NoError(t, err) { + t.FailNow() + } + fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) + if !assert.NoError(t, err) { + t.FailNow() + } + assertEqualSleepSchedule(t, linuxWorker.ID, emptySchedule, *fetched) + +} + +func TestSetWorkerSleepScheduleNextCheck(t *testing.T) { + ctx, finish, db := persistenceTestFixtures(t, 1*time.Second) + defer finish() + + schedule := SleepSchedule{ + Worker: &Worker{ + UUID: "2b1f857a-fd64-484b-9c17-cf89bbe47be7", + Name: "дрон 1", + Status: api.WorkerStatusAwake, + }, + IsActive: true, + DaysOfWeek: "mo,tu,th,fr", + StartTime: TimeOfDay{18, 0}, + EndTime: TimeOfDay{9, 0}, + } + // Use GORM to create the worker and sleep schedule in one go. + if tx := db.gormDB.Create(&schedule); tx.Error != nil { + panic(tx.Error) + } + + future := db.gormDB.NowFunc().Add(5 * time.Hour) + schedule.NextCheck = future + + err := db.SetWorkerSleepScheduleNextCheck(ctx, &schedule) + if !assert.NoError(t, err) { + t.FailNow() + } + + fetched, err := db.FetchWorkerSleepSchedule(ctx, schedule.Worker.UUID) + if !assert.NoError(t, err) { + t.FailNow() + } + assertEqualSleepSchedule(t, schedule.Worker.ID, schedule, *fetched) +} + +func TestFetchSleepSchedulesToCheck(t *testing.T) { + ctx, finish, db := persistenceTestFixtures(t, 1*time.Second) + defer finish() + + mockedNow := mustParseTime("2022-06-07T11:14:47+02:00") + mockedPast := mockedNow.Add(-10 * time.Second) + mockedFuture := mockedNow.Add(10 * time.Second) + + db.gormDB.NowFunc = func() time.Time { return mockedNow } + + schedule0 := SleepSchedule{ // Next check in the past -> should be checked. + Worker: &Worker{ + UUID: "2b1f857a-fd64-484b-9c17-cf89bbe47be7", + Name: "дрон 1", + Status: api.WorkerStatusAwake, + }, + IsActive: true, + DaysOfWeek: "mo,tu,th,fr", + StartTime: TimeOfDay{18, 0}, + EndTime: TimeOfDay{9, 0}, + + NextCheck: mockedPast, + } + + schedule1 := SleepSchedule{ // Next check in future -> should not be checked. + Worker: &Worker{ + UUID: "4475738e-41eb-47b2-8bca-2bbcabab69bb", + Name: "дрон 2", + Status: api.WorkerStatusAwake, + }, + IsActive: true, + DaysOfWeek: "mo,tu,th,fr", + StartTime: TimeOfDay{18, 0}, + EndTime: TimeOfDay{9, 0}, + + NextCheck: mockedFuture, + } + + schedule2 := SleepSchedule{ // Next check is zero value -> should be checked. + Worker: &Worker{ + UUID: "dc251817-6a11-4548-a36a-07b0d50b4c21", + Name: "дрон 3", + Status: api.WorkerStatusAwake, + }, + IsActive: true, + DaysOfWeek: "mo,tu,th,fr", + StartTime: TimeOfDay{18, 0}, + EndTime: TimeOfDay{9, 0}, + + NextCheck: time.Time{}, // zero value for time. + } + + schedule3 := SleepSchedule{ // Schedule inactive -> should not be checked. + Worker: &Worker{ + UUID: "874d5fc6-5784-4d43-8c20-6e7e73fc1b8d", + Name: "дрон 4", + Status: api.WorkerStatusAwake, + }, + IsActive: false, + DaysOfWeek: "mo,tu,th,fr", + StartTime: TimeOfDay{18, 0}, + EndTime: TimeOfDay{9, 0}, + + NextCheck: mockedPast, // next check in the past, so if active it would be checked. + } + + // Use GORM to create the workers and sleep schedules in one go. + scheds := []*SleepSchedule{&schedule0, &schedule1, &schedule2, &schedule3} + for idx := range scheds { + if tx := db.gormDB.Create(scheds[idx]); tx.Error != nil { + panic(tx.Error) + } + } + + toCheck, err := db.FetchSleepSchedulesToCheck(ctx) + if assert.NoError(t, err) && assert.Len(t, toCheck, 2) { + assertEqualSleepSchedule(t, schedule0.Worker.ID, schedule0, *toCheck[0]) + assert.Nil(t, toCheck[0].Worker, "the Worker should NOT be fetched") + assertEqualSleepSchedule(t, schedule2.Worker.ID, schedule1, *toCheck[1]) + assert.Nil(t, toCheck[1].Worker, "the Worker should NOT be fetched") + } } func assertEqualSleepSchedule(t *testing.T, workerID uint, expect, actual SleepSchedule) { - assert.Equal(t, workerID, actual.WorkerID) + assert.Equal(t, workerID, actual.WorkerID, "sleep schedule is assigned to different worker") assert.Nil(t, actual.Worker, "the Worker itself should not be fetched") - assert.Equal(t, expect.IsActive, actual.IsActive) - assert.Equal(t, expect.DaysOfWeek, actual.DaysOfWeek) - assert.Equal(t, expect.StartTime, actual.StartTime) - assert.Equal(t, expect.EndTime, actual.EndTime) + assert.Equal(t, expect.IsActive, actual.IsActive, "IsActive does not match") + assert.Equal(t, expect.DaysOfWeek, actual.DaysOfWeek, "DaysOfWeek does not match") + assert.Equal(t, expect.StartTime, actual.StartTime, "StartTime does not match") + assert.Equal(t, expect.EndTime, actual.EndTime, "EndTime does not match") +} + +func mustParseTime(timeString string) time.Time { + parsed, err := time.Parse(time.RFC3339, timeString) + if err != nil { + panic(err) + } + return parsed } diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 50d1ef39..eba990e8 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -83,10 +83,11 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { err := db.gormDB.WithContext(ctx). Model(w). - Select("status", "status_requested"). + Select("status", "status_requested", "lazy_status_request"). Updates(Worker{ - Status: w.Status, - StatusRequested: w.StatusRequested, + Status: w.Status, + StatusRequested: w.StatusRequested, + LazyStatusRequest: w.LazyStatusRequest, }).Error if err != nil { return fmt.Errorf("saving worker: %w", err) diff --git a/internal/manager/sleep_scheduler/calculations.go b/internal/manager/sleep_scheduler/calculations.go new file mode 100644 index 00000000..7b112f0a --- /dev/null +++ b/internal/manager/sleep_scheduler/calculations.go @@ -0,0 +1,93 @@ +package sleep_scheduler + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "strings" + "time" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +// scheduledWorkerStatus returns the expected worker status at the given date/time. +func scheduledWorkerStatus(now time.Time, sched *persistence.SleepSchedule) api.WorkerStatus { + tod := persistence.MakeTimeOfDay(now) + + if !sched.IsActive { + return api.WorkerStatusAwake + } + + if sched.DaysOfWeek != "" { + weekdayName := strings.ToLower(now.Weekday().String()[:2]) + if !strings.Contains(sched.DaysOfWeek, weekdayName) { + // There are days configured, and today is not a sleeping day. + return api.WorkerStatusAwake + } + } + + beforeStart := sched.StartTime.HasValue() && tod.IsBefore(sched.StartTime) + afterEnd := sched.EndTime.HasValue() && !tod.IsBefore(sched.EndTime) + + if beforeStart || afterEnd { + // Outside sleeping time. + return api.WorkerStatusAwake + } + + return api.WorkerStatusAsleep +} + +func cleanupDaysOfWeek(daysOfWeek string) string { + trimmed := strings.TrimSpace(daysOfWeek) + if trimmed == "" { + return "" + } + + daynames := strings.Fields(trimmed) + for idx, name := range daynames { + daynames[idx] = strings.ToLower(strings.TrimSpace(name))[:2] + } + return strings.Join(daynames, " ") +} + +// Return a timestamp when the next scheck for this schedule is due. +func calculateNextCheck(now time.Time, schedule *persistence.SleepSchedule) time.Time { + // calcNext returns the given time of day on "today" if that hasn't passed + // yet, otherwise on "tomorrow". + calcNext := func(tod persistence.TimeOfDay) time.Time { + nextCheck := tod.OnDate(now) + if nextCheck.Before(now) { + nextCheck = nextCheck.AddDate(0, 0, 1) + } + return nextCheck + } + + nextChecks := []time.Time{ + // Always check at the end of the day. + calcNext(persistence.TimeOfDay{Hour: 24, Minute: 0}), + } + + // No start time means "start of the day", which is already covered by + // yesterday's "end of the day" check. + if schedule.StartTime.HasValue() { + nextChecks = append(nextChecks, calcNext(schedule.StartTime)) + } + // No end time means "end of the day", which is already covered by today's + // "end of the day" check. + if schedule.EndTime.HasValue() { + nextChecks = append(nextChecks, calcNext(schedule.EndTime)) + } + + next := earliestTime(nextChecks) + return next +} + +func earliestTime(timestamps []time.Time) time.Time { + earliest := timestamps[0] + for _, timestamp := range timestamps[1:] { + if timestamp.Before(earliest) { + earliest = timestamp + } + } + return earliest +} diff --git a/internal/manager/sleep_scheduler/calculations_test.go b/internal/manager/sleep_scheduler/calculations_test.go new file mode 100644 index 00000000..4b36a2c7 --- /dev/null +++ b/internal/manager/sleep_scheduler/calculations_test.go @@ -0,0 +1,114 @@ +package sleep_scheduler + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +func TestCalculateNextCheck(t *testing.T) { + _, mocks, _ := testFixtures(t) + + var sched persistence.SleepSchedule + empty := persistence.EmptyTimeOfDay() + + // Below, S, N, and E respectively mean Start, Now, and End times. + // Their order shows their relation to "Now". Lower-case letters mean "no value". + // Note that N can never be before 's' or after 'e'. + + // S N E -> E + sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: mkToD(18, 0)} + assert.Equal(t, mocks.todayAt(18, 0), calculateNextCheck(mocks.todayAt(11, 16), &sched)) + + // S E N -> end of day + sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: mkToD(18, 0)} + assert.Equal(t, mocks.endOfDay(), calculateNextCheck(mocks.todayAt(19, 16), &sched)) + + // N S E -> S + sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: mkToD(18, 0)} + assert.Equal(t, mocks.todayAt(9, 0), calculateNextCheck(mocks.todayAt(8, 47), &sched)) + + // s N e -> end of day + sched = persistence.SleepSchedule{StartTime: empty, EndTime: empty} + assert.Equal(t, mocks.endOfDay(), calculateNextCheck(mocks.todayAt(7, 47), &sched)) + + // S N e -> end of day + sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: empty} + assert.Equal(t, mocks.endOfDay(), calculateNextCheck(mocks.todayAt(10, 47), &sched)) + + // s N E -> E + sched = persistence.SleepSchedule{StartTime: empty, EndTime: mkToD(18, 0)} + assert.Equal(t, mocks.todayAt(18, 0), calculateNextCheck(mocks.todayAt(7, 47), &sched)) +} + +func TestScheduledWorkerStatus(t *testing.T) { + _, mocks, _ := testFixtures(t) + + var sched persistence.SleepSchedule + empty := persistence.EmptyTimeOfDay() + + // Below, S, N, and E respectively mean Start, Now, and End times. + // Their order shows their relation to "Now". Lower-case letters mean "no value". + // Note that N can never be before 's' or after 'e'. + + // Test time logic without any DaysOfWeek set, i.e. the scheduled times apply + // to each day. + + // S N E -> asleep + sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: mkToD(18, 0), IsActive: true} + assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(11, 16), &sched)) + + // S E N -> awake + assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(19, 16), &sched)) + + // N S E -> awake + assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(8, 47), &sched)) + + // s N e -> asleep + sched = persistence.SleepSchedule{StartTime: empty, EndTime: empty, IsActive: true} + assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(7, 47), &sched)) + + // S N e -> asleep + sched = persistence.SleepSchedule{StartTime: mkToD(9, 0), EndTime: empty, IsActive: true} + assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(10, 47), &sched)) + + // s N E -> asleep + sched = persistence.SleepSchedule{StartTime: empty, EndTime: mkToD(18, 0), IsActive: true} + assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(7, 47), &sched)) + + // Test DaysOfWeek logic, but only with explicit start & end times. The logic + // for missing start/end is already covered above. + // The mocked "today" is a Tuesday. + + // S N E unmentioned day -> awake + sched = persistence.SleepSchedule{DaysOfWeek: "mo we", StartTime: mkToD(9, 0), EndTime: mkToD(18, 0), IsActive: true} + assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(11, 16), &sched)) + + // S E N unmentioned day -> awake + assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(19, 16), &sched)) + + // N S E unmentioned day -> awake + assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(8, 47), &sched)) + + // S N E mentioned day -> asleep + sched = persistence.SleepSchedule{DaysOfWeek: "tu th fr", StartTime: mkToD(9, 0), EndTime: mkToD(18, 0), IsActive: true} + assert.Equal(t, api.WorkerStatusAsleep, scheduledWorkerStatus(mocks.todayAt(11, 16), &sched)) + + // S E N mentioned day -> awake + assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(19, 16), &sched)) + + // N S E mentioned day -> awake + assert.Equal(t, api.WorkerStatusAwake, scheduledWorkerStatus(mocks.todayAt(8, 47), &sched)) +} + +func TestCleanupDaysOfWeek(t *testing.T) { + assert.Equal(t, "", cleanupDaysOfWeek("")) + assert.Equal(t, "mo tu we", cleanupDaysOfWeek("mo tu we")) + assert.Equal(t, "mo tu we", cleanupDaysOfWeek(" mo tu we \n")) + assert.Equal(t, "mo tu we", cleanupDaysOfWeek("monday tuesday wed")) +} diff --git a/internal/manager/sleep_scheduler/interfaces.go b/internal/manager/sleep_scheduler/interfaces.go new file mode 100644 index 00000000..361cf3ca --- /dev/null +++ b/internal/manager/sleep_scheduler/interfaces.go @@ -0,0 +1,37 @@ +package sleep_scheduler + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/webupdates" + "git.blender.org/flamenco/pkg/api" +) + +// Generate mock implementations of these interfaces. +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/sleep_scheduler PersistenceService,ChangeBroadcaster + +type PersistenceService interface { + FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error) + SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule *persistence.SleepSchedule) error + // FetchSleepScheduleWorker sets the given schedule's `Worker` pointer. + FetchSleepScheduleWorker(ctx context.Context, schedule *persistence.SleepSchedule) error + FetchSleepSchedulesToCheck(ctx context.Context) ([]*persistence.SleepSchedule, error) + + SetWorkerSleepScheduleNextCheck(ctx context.Context, schedule *persistence.SleepSchedule) error + + SaveWorkerStatus(ctx context.Context, w *persistence.Worker) error +} + +var _ PersistenceService = (*persistence.DB)(nil) + +// TODO: Refactor the way worker status changes are handled, so that this +// service doens't need to broadcast its own worker updates. +type ChangeBroadcaster interface { + BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) +} + +// ChangeBroadcaster should be a subset of webupdates.BiDirComms. +var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) diff --git a/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go b/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go new file mode 100644 index 00000000..8cf6f77a --- /dev/null +++ b/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go @@ -0,0 +1,158 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: git.blender.org/flamenco/internal/manager/sleep_scheduler (interfaces: PersistenceService,ChangeBroadcaster) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + persistence "git.blender.org/flamenco/internal/manager/persistence" + api "git.blender.org/flamenco/pkg/api" + gomock "github.com/golang/mock/gomock" +) + +// MockPersistenceService is a mock of PersistenceService interface. +type MockPersistenceService struct { + ctrl *gomock.Controller + recorder *MockPersistenceServiceMockRecorder +} + +// MockPersistenceServiceMockRecorder is the mock recorder for MockPersistenceService. +type MockPersistenceServiceMockRecorder struct { + mock *MockPersistenceService +} + +// NewMockPersistenceService creates a new mock instance. +func NewMockPersistenceService(ctrl *gomock.Controller) *MockPersistenceService { + mock := &MockPersistenceService{ctrl: ctrl} + mock.recorder = &MockPersistenceServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder { + return m.recorder +} + +// FetchSleepScheduleWorker mocks base method. +func (m *MockPersistenceService) FetchSleepScheduleWorker(arg0 context.Context, arg1 *persistence.SleepSchedule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchSleepScheduleWorker", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// FetchSleepScheduleWorker indicates an expected call of FetchSleepScheduleWorker. +func (mr *MockPersistenceServiceMockRecorder) FetchSleepScheduleWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSleepScheduleWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchSleepScheduleWorker), arg0, arg1) +} + +// FetchSleepSchedulesToCheck mocks base method. +func (m *MockPersistenceService) FetchSleepSchedulesToCheck(arg0 context.Context) ([]*persistence.SleepSchedule, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchSleepSchedulesToCheck", arg0) + ret0, _ := ret[0].([]*persistence.SleepSchedule) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchSleepSchedulesToCheck indicates an expected call of FetchSleepSchedulesToCheck. +func (mr *MockPersistenceServiceMockRecorder) FetchSleepSchedulesToCheck(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSleepSchedulesToCheck", reflect.TypeOf((*MockPersistenceService)(nil).FetchSleepSchedulesToCheck), arg0) +} + +// FetchWorkerSleepSchedule mocks base method. +func (m *MockPersistenceService) FetchWorkerSleepSchedule(arg0 context.Context, arg1 string) (*persistence.SleepSchedule, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchWorkerSleepSchedule", arg0, arg1) + ret0, _ := ret[0].(*persistence.SleepSchedule) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchWorkerSleepSchedule indicates an expected call of FetchWorkerSleepSchedule. +func (mr *MockPersistenceServiceMockRecorder) FetchWorkerSleepSchedule(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerSleepSchedule), arg0, arg1) +} + +// SaveWorkerStatus mocks base method. +func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *persistence.Worker) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveWorkerStatus", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveWorkerStatus indicates an expected call of SaveWorkerStatus. +func (mr *MockPersistenceServiceMockRecorder) SaveWorkerStatus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorkerStatus", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorkerStatus), arg0, arg1) +} + +// SetWorkerSleepSchedule mocks base method. +func (m *MockPersistenceService) SetWorkerSleepSchedule(arg0 context.Context, arg1 string, arg2 *persistence.SleepSchedule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetWorkerSleepSchedule", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetWorkerSleepSchedule indicates an expected call of SetWorkerSleepSchedule. +func (mr *MockPersistenceServiceMockRecorder) SetWorkerSleepSchedule(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).SetWorkerSleepSchedule), arg0, arg1, arg2) +} + +// SetWorkerSleepScheduleNextCheck mocks base method. +func (m *MockPersistenceService) SetWorkerSleepScheduleNextCheck(arg0 context.Context, arg1 *persistence.SleepSchedule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetWorkerSleepScheduleNextCheck", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetWorkerSleepScheduleNextCheck indicates an expected call of SetWorkerSleepScheduleNextCheck. +func (mr *MockPersistenceServiceMockRecorder) SetWorkerSleepScheduleNextCheck(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkerSleepScheduleNextCheck", reflect.TypeOf((*MockPersistenceService)(nil).SetWorkerSleepScheduleNextCheck), arg0, arg1) +} + +// MockChangeBroadcaster is a mock of ChangeBroadcaster interface. +type MockChangeBroadcaster struct { + ctrl *gomock.Controller + recorder *MockChangeBroadcasterMockRecorder +} + +// MockChangeBroadcasterMockRecorder is the mock recorder for MockChangeBroadcaster. +type MockChangeBroadcasterMockRecorder struct { + mock *MockChangeBroadcaster +} + +// NewMockChangeBroadcaster creates a new mock instance. +func NewMockChangeBroadcaster(ctrl *gomock.Controller) *MockChangeBroadcaster { + mock := &MockChangeBroadcaster{ctrl: ctrl} + mock.recorder = &MockChangeBroadcasterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder { + return m.recorder +} + +// BroadcastWorkerUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastWorkerUpdate", arg0) +} + +// BroadcastWorkerUpdate indicates an expected call of BroadcastWorkerUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerUpdate), arg0) +} diff --git a/internal/manager/sleep_scheduler/sleep_scheduler.go b/internal/manager/sleep_scheduler/sleep_scheduler.go new file mode 100644 index 00000000..c8da167f --- /dev/null +++ b/internal/manager/sleep_scheduler/sleep_scheduler.go @@ -0,0 +1,198 @@ +package sleep_scheduler + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "fmt" + "time" + + "github.com/benbjohnson/clock" + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +// Time period for checking the schedule of every worker. +const checkInterval = 10 * time.Second + +// SleepScheduler manages wake/sleep cycles of Workers. +type SleepScheduler struct { + clock clock.Clock + persist PersistenceService + broadcaster ChangeBroadcaster +} + +// New creates a new SleepScheduler. +func New(clock clock.Clock, persist PersistenceService, broadcaster ChangeBroadcaster) *SleepScheduler { + return &SleepScheduler{ + clock: clock, + persist: persist, + broadcaster: broadcaster, + } +} + +// Run occasionally checks the sleep schedule and updates workers. +// It stops running when the context closes. +func (ss *SleepScheduler) Run(ctx context.Context) { + log.Info(). + Str("checkInterval", checkInterval.String()). + Msg("sleep scheduler starting") + defer log.Info().Msg("sleep scheduler shutting down") + + for { + select { + case <-ctx.Done(): + return + case <-time.After(checkInterval): + ss.CheckSchedules(ctx) + } + } +} + +func (ss *SleepScheduler) FetchSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error) { + return ss.persist.FetchWorkerSleepSchedule(ctx, workerUUID) +} + +// SetSleepSchedule stores the given schedule as the worker's new sleep schedule. +// The new schedule is immediately applied to the Worker. +func (ss *SleepScheduler) SetSchedule(ctx context.Context, workerUUID string, schedule *persistence.SleepSchedule) error { + // Ensure 'start' actually preceeds 'end'. + if schedule.StartTime.HasValue() && + schedule.EndTime.HasValue() && + schedule.EndTime.IsBefore(schedule.StartTime) { + schedule.StartTime, schedule.EndTime = schedule.EndTime, schedule.StartTime + } + + schedule.DaysOfWeek = cleanupDaysOfWeek(schedule.DaysOfWeek) + schedule.NextCheck = ss.calculateNextCheck(schedule) + + if err := ss.persist.SetWorkerSleepSchedule(ctx, workerUUID, schedule); err != nil { + return fmt.Errorf("persisting sleep schedule of worker %s: %w", workerUUID, err) + } + + return ss.ApplySleepSchedule(ctx, schedule) +} + +// scheduledWorkerStatus returns the expected worker status for the current date/time. +func (ss *SleepScheduler) scheduledWorkerStatus(sched *persistence.SleepSchedule) api.WorkerStatus { + now := ss.clock.Now() + return scheduledWorkerStatus(now, sched) +} + +// Return a timestamp when the next scheck for this schedule is due. +func (ss *SleepScheduler) calculateNextCheck(schedule *persistence.SleepSchedule) time.Time { + now := ss.clock.Now() + return calculateNextCheck(now, schedule) +} + +// ApplySleepSchedule sets worker.StatusRequested if the scheduler demands a status change. +func (ss *SleepScheduler) ApplySleepSchedule(ctx context.Context, schedule *persistence.SleepSchedule) error { + // Find the Worker managed by this schedule. + worker := schedule.Worker + if worker == nil { + err := ss.persist.FetchSleepScheduleWorker(ctx, schedule) + if err != nil { + return err + } + worker = schedule.Worker + } + + scheduled := ss.scheduledWorkerStatus(schedule) + if scheduled == "" || + (worker.StatusRequested == scheduled && !worker.LazyStatusRequest) || + (worker.Status == scheduled && worker.StatusRequested == "") { + // The worker is already in the right state, or is non-lazily requested to + // go to the right state, so nothing else has to be done. + return nil + } + + logger := log.With(). + Str("worker", worker.Identifier()). + Str("currentStatus", string(worker.Status)). + Str("scheduledStatus", string(scheduled)). + Logger() + + if worker.StatusRequested != "" { + logger.Info().Str("oldStatusRequested", string(worker.StatusRequested)). + Msg("sleep scheduler: overruling previously requested status with scheduled status") + } else { + logger.Info().Msg("sleep scheduler: requesting worker to switch to scheduled status") + } + + if err := ss.updateWorkerStatus(ctx, worker, scheduled); err != nil { + return err + } + return nil +} + +func (ss *SleepScheduler) updateWorkerStatus( + ctx context.Context, + worker *persistence.Worker, + newStatus api.WorkerStatus, +) error { + // Sleep schedule should be adhered to immediately, no lazy requests. + // A render task can run for hours, so better to not wait for it. + worker.StatusChangeRequest(newStatus, false) + + err := ss.persist.SaveWorkerStatus(ctx, worker) + if err != nil { + return fmt.Errorf("error saving worker %s to database: %w", worker.Identifier(), err) + } + + // Broadcast worker change via SocketIO + ss.broadcaster.BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{ + Id: worker.UUID, + Name: worker.Name, + Status: worker.Status, + StatusChange: &api.WorkerStatusChangeRequest{ + IsLazy: false, + Status: worker.StatusRequested, + }, + Updated: worker.UpdatedAt, + Version: worker.Software, + }) + + return nil +} + +// CheckSchedules updates the status of all workers for which a schedule is active. +func (ss *SleepScheduler) CheckSchedules(ctx context.Context) { + toCheck, err := ss.persist.FetchSleepSchedulesToCheck(ctx) + if err != nil { + log.Error().Err(err).Msg("sleep scheduler: unable to fetch sleep schedules") + return + } + if len(toCheck) == 0 { + log.Debug().Msg("sleep scheduler: no sleep schedules need checking") + return + } + + log.Debug().Int("numWorkers", len(toCheck)).Msg("sleep scheduler: checking worker sleep schedules") + + for _, schedule := range toCheck { + ss.checkSchedule(ctx, schedule) + } +} + +func (ss *SleepScheduler) checkSchedule(ctx context.Context, schedule *persistence.SleepSchedule) { + // Compute the next time to check. + schedule.NextCheck = ss.calculateNextCheck(schedule) + if err := ss.persist.SetWorkerSleepScheduleNextCheck(ctx, schedule); err != nil { + log.Error(). + Err(err). + Str("worker", schedule.Worker.Identifier()). + Msg("sleep scheduler: error refreshing worker's sleep schedule") + return + } + + // Apply the schedule to the worker. + if err := ss.ApplySleepSchedule(ctx, schedule); err != nil { + log.Error(). + Err(err). + Str("worker", schedule.Worker.Identifier()). + Msg("sleep scheduler: error applying worker's sleep schedule") + return + } +} diff --git a/internal/manager/sleep_scheduler/sleep_scheduler_test.go b/internal/manager/sleep_scheduler/sleep_scheduler_test.go new file mode 100644 index 00000000..62a63aca --- /dev/null +++ b/internal/manager/sleep_scheduler/sleep_scheduler_test.go @@ -0,0 +1,269 @@ +package sleep_scheduler + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/sleep_scheduler/mocks" + "git.blender.org/flamenco/pkg/api" +) + +func TestFetchSchedule(t *testing.T) { + ss, mocks, ctx := testFixtures(t) + + workerUUID := "aeb49d8a-6903-41b3-b545-77b7a1c0ca19" + dbSched := persistence.SleepSchedule{} + mocks.persist.EXPECT().FetchWorkerSleepSchedule(ctx, workerUUID).Return(&dbSched, nil) + + sched, err := ss.FetchSchedule(ctx, workerUUID) + if assert.NoError(t, err) { + assert.Equal(t, &dbSched, sched) + } +} + +func TestSetSchedule(t *testing.T) { + ss, mocks, ctx := testFixtures(t) + + workerUUID := "aeb49d8a-6903-41b3-b545-77b7a1c0ca19" + + sched := persistence.SleepSchedule{ + IsActive: true, + DaysOfWeek: " mo tu we", + StartTime: mkToD(9, 0), + EndTime: mkToD(18, 0), + + Worker: &persistence.Worker{ + UUID: workerUUID, + Status: api.WorkerStatusAwake, + }, + } + expectSavedSchedule := sched + expectSavedSchedule.DaysOfWeek = "mo tu we" // Expect a cleanup + expectNextCheck := mocks.todayAt(18, 0) // "now" is at 11:14:47, expect a check at the end time. + expectSavedSchedule.NextCheck = expectNextCheck + + // Expect the new schedule to be saved. + mocks.persist.EXPECT().SetWorkerSleepSchedule(ctx, workerUUID, &expectSavedSchedule) + + // Expect the new schedule to be immediately applied to the Worker. + // `TestApplySleepSchedule` checks those values, no need to do that here. + mocks.persist.EXPECT().SaveWorkerStatus(ctx, gomock.Any()) + mocks.broadcaster.EXPECT().BroadcastWorkerUpdate(gomock.Any()) + + err := ss.SetSchedule(ctx, workerUUID, &sched) + assert.NoError(t, err) +} + +func TestSetScheduleSwappedStartEnd(t *testing.T) { + ss, mocks, ctx := testFixtures(t) + + workerUUID := "aeb49d8a-6903-41b3-b545-77b7a1c0ca19" + + sched := persistence.SleepSchedule{ + IsActive: true, + DaysOfWeek: "mo tu we", + StartTime: mkToD(18, 0), + EndTime: mkToD(9, 0), + + // Worker already in the right state, so no saving/broadcasting expected. + Worker: &persistence.Worker{ + UUID: workerUUID, + Status: api.WorkerStatusAsleep, + }, + } + + expectSavedSchedule := persistence.SleepSchedule{ + IsActive: true, + DaysOfWeek: "mo tu we", + StartTime: mkToD(9, 0), // Expect start and end time to be corrected. + EndTime: mkToD(18, 0), + NextCheck: mocks.todayAt(18, 0), // "now" is at 11:14:47, expect a check at the end time. + Worker: sched.Worker, + } + + mocks.persist.EXPECT().SetWorkerSleepSchedule(ctx, workerUUID, &expectSavedSchedule) + + err := ss.SetSchedule(ctx, workerUUID, &sched) + assert.NoError(t, err) +} + +func TestApplySleepSchedule(t *testing.T) { + ss, mocks, ctx := testFixtures(t) + + worker := persistence.Worker{ + Model: persistence.Model{ID: 5}, + UUID: "74997de4-c530-4913-b89f-c489f14f7634", + Status: api.WorkerStatusOffline, + } + + sched := persistence.SleepSchedule{ + IsActive: true, + DaysOfWeek: "mo tu we", + StartTime: mkToD(9, 0), + EndTime: mkToD(18, 0), + } + + testForExpectedStatus := func(expectedNewStatus api.WorkerStatus) { + // Take a copy of the worker & schedule, for test isolation. + testSchedule := sched + testWorker := worker + + // Expect the Worker to be fetched. + mocks.persist.EXPECT().FetchSleepScheduleWorker(ctx, &testSchedule).DoAndReturn( + func(ctx context.Context, schedule *persistence.SleepSchedule) error { + schedule.Worker = &testWorker + return nil + }) + + // Construct the worker as we expect it to be saved to the database. + savedWorker := testWorker + savedWorker.LazyStatusRequest = false + savedWorker.StatusRequested = expectedNewStatus + mocks.persist.EXPECT().SaveWorkerStatus(ctx, &savedWorker) + + // Expect SocketIO broadcast. + var sioUpdate api.SocketIOWorkerUpdate + mocks.broadcaster.EXPECT().BroadcastWorkerUpdate(gomock.Any()).DoAndReturn( + func(workerUpdate api.SocketIOWorkerUpdate) { + sioUpdate = workerUpdate + }) + + // Actually apply the sleep schedule. + err := ss.ApplySleepSchedule(ctx, &testSchedule) + if !assert.NoError(t, err) { + t.FailNow() + } + + // Check the SocketIO broadcast. + if sioUpdate.Id != "" { + assert.Equal(t, testWorker.UUID, sioUpdate.Id) + assert.False(t, sioUpdate.StatusChange.IsLazy) + assert.Equal(t, expectedNewStatus, sioUpdate.StatusChange.Status) + } + } + + // Move the clock to the middle of the sleep schedule, so worker should sleep. + mocks.clock.Set(mocks.todayAt(10, 47)) + testForExpectedStatus(api.WorkerStatusAsleep) + + // Move the clock to before the sleep schedule start. + mocks.clock.Set(mocks.todayAt(0, 3)) + testForExpectedStatus(api.WorkerStatusAwake) + + // Move the clock to after the sleep schedule ends. + mocks.clock.Set(mocks.todayAt(19, 59)) + testForExpectedStatus(api.WorkerStatusAwake) + + // Test that the worker should sleep, and has already been requested to sleep, + // but lazily. This should trigger a non-lazy status change request. + mocks.clock.Set(mocks.todayAt(10, 47)) + worker.Status = api.WorkerStatusAwake + worker.StatusRequested = api.WorkerStatusAsleep + worker.LazyStatusRequest = true + testForExpectedStatus(api.WorkerStatusAsleep) +} + +func TestApplySleepScheduleAlreadyCorrectStatus(t *testing.T) { + ss, mocks, ctx := testFixtures(t) + + worker := persistence.Worker{ + Model: persistence.Model{ID: 5}, + UUID: "74997de4-c530-4913-b89f-c489f14f7634", + Status: api.WorkerStatusAsleep, + } + + sched := persistence.SleepSchedule{ + IsActive: true, + DaysOfWeek: "mo tu we", + StartTime: mkToD(9, 0), + EndTime: mkToD(18, 0), + } + + runTest := func() { + // Take a copy of the worker & schedule, for test isolation. + testSchedule := sched + testWorker := worker + + // Expect the Worker to be fetched. + mocks.persist.EXPECT().FetchSleepScheduleWorker(ctx, &testSchedule).DoAndReturn( + func(ctx context.Context, schedule *persistence.SleepSchedule) error { + schedule.Worker = &testWorker + return nil + }) + + // Apply the sleep schedule. This should not trigger any persistence or broadcasts. + err := ss.ApplySleepSchedule(ctx, &testSchedule) + if !assert.NoError(t, err) { + t.FailNow() + } + } + + // Move the clock to the middle of the sleep schedule, so the schedule always + // wants the worker to sleep. + mocks.clock.Set(mocks.todayAt(10, 47)) + + // Current status is already good. + worker.Status = api.WorkerStatusAsleep + runTest() + + // Current status is not the right one, but the requested status is already good. + worker.Status = api.WorkerStatusAwake + worker.StatusRequested = api.WorkerStatusAsleep + worker.LazyStatusRequest = false + runTest() +} + +type TestMocks struct { + clock *clock.Mock + persist *mocks.MockPersistenceService + broadcaster *mocks.MockChangeBroadcaster +} + +// todayAt returns whatever the mocked clock's "now" is set to, with the time set +// to the given time. Seconds and sub-seconds are set to zero. +func (m *TestMocks) todayAt(hour, minute int) time.Time { + now := m.clock.Now() + return time.Date(now.Year(), now.Month(), now.Day(), hour, minute, 0, 0, now.Location()) + +} + +// endOfDay returns midnight of the day after whatever the mocked clock's "now" is set to. +func (m *TestMocks) endOfDay() time.Time { + now := m.clock.Now() + return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).AddDate(0, 0, 1) +} + +func testFixtures(t *testing.T) (*SleepScheduler, TestMocks, context.Context) { + ctx := context.Background() + + mockedClock := clock.NewMock() + mockedNow, err := time.Parse(time.RFC3339, "2022-06-07T11:14:47+02:00") + if err != nil { + panic(err) + } + mockedClock.Set(mockedNow) + if !assert.Equal(t, time.Tuesday.String(), mockedNow.Weekday().String()) { + t.Fatal("tests assume 'now' is a Tuesday") + } + + mockCtrl := gomock.NewController(t) + mocks := TestMocks{ + clock: mockedClock, + persist: mocks.NewMockPersistenceService(mockCtrl), + broadcaster: mocks.NewMockChangeBroadcaster(mockCtrl), + } + ss := New(mocks.clock, mocks.persist, mocks.broadcaster) + return ss, mocks, ctx +} + +func mkToD(hour, minute int) persistence.TimeOfDay { + return persistence.TimeOfDay{Hour: hour, Minute: minute} +}