diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 6d99e6be..c957ed74 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -65,6 +65,9 @@ type PersistenceService interface { // CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type. CountTaskFailuresOfWorker(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) (int, error) + FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error) + SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule persistence.SleepSchedule) error + // Database queries. QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 5082fca0..f24d6b4b 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -216,6 +216,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) } +// FetchWorkerSleepSchedule mocks base method. +func (m *MockPersistenceService) FetchWorkerSleepSchedule(arg0 context.Context, arg1 string) (*persistence.SleepSchedule, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchWorkerSleepSchedule", arg0, arg1) + ret0, _ := ret[0].(*persistence.SleepSchedule) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchWorkerSleepSchedule indicates an expected call of FetchWorkerSleepSchedule. +func (mr *MockPersistenceServiceMockRecorder) FetchWorkerSleepSchedule(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerSleepSchedule), arg0, arg1) +} + // FetchWorkers mocks base method. func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { m.ctrl.T.Helper() @@ -375,6 +390,20 @@ func (mr *MockPersistenceServiceMockRecorder) SetLastRendered(arg0, arg1 interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastRendered", reflect.TypeOf((*MockPersistenceService)(nil).SetLastRendered), arg0, arg1) } +// SetWorkerSleepSchedule mocks base method. +func (m *MockPersistenceService) SetWorkerSleepSchedule(arg0 context.Context, arg1 string, arg2 persistence.SleepSchedule) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetWorkerSleepSchedule", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetWorkerSleepSchedule indicates an expected call of SetWorkerSleepSchedule. +func (mr *MockPersistenceServiceMockRecorder) SetWorkerSleepSchedule(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).SetWorkerSleepSchedule), arg0, arg1, arg2) +} + // StoreAuthoredJob mocks base method. func (m *MockPersistenceService) StoreAuthoredJob(arg0 context.Context, arg1 job_compilers.AuthoredJob) error { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/worker_sleep_schedule.go b/internal/manager/api_impl/worker_sleep_schedule.go new file mode 100644 index 00000000..1f34b3d4 --- /dev/null +++ b/internal/manager/api_impl/worker_sleep_schedule.go @@ -0,0 +1,79 @@ +package api_impl + +import ( + "errors" + "net/http" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/uuid" + "git.blender.org/flamenco/pkg/api" + "github.com/labstack/echo/v4" +) + +func (f *Flamenco) FetchWorkerSleepSchedule(e echo.Context, workerUUID string) error { + if !uuid.IsValid(workerUUID) { + return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") + } + + ctx := e.Request().Context() + logger := requestLogger(e) + logger = logger.With().Str("worker", workerUUID).Logger() + schedule, err := f.persist.FetchWorkerSleepSchedule(ctx, workerUUID) + + switch { + case errors.Is(err, persistence.ErrWorkerNotFound): + logger.Warn().Msg("FetchWorkerSleepSchedule: worker does not exist") + return sendAPIError(e, http.StatusNotFound, "worker %q does not exist", workerUUID) + case err != nil: + logger.Error().Err(err).Msg("FetchWorkerSleepSchedule: error fetching sleep schedule") + return sendAPIError(e, http.StatusInternalServerError, "error fetching sleep schedule: %v", err) + case schedule == nil: + return e.NoContent(http.StatusNoContent) + } + + apiSchedule := api.WorkerSleepSchedule{ + DaysOfWeek: schedule.DaysOfWeek, + EndTime: schedule.EndTime, + IsActive: schedule.IsActive, + StartTime: schedule.StartTime, + } + return e.JSON(http.StatusOK, apiSchedule) +} + +func (f *Flamenco) SetWorkerSleepSchedule(e echo.Context, workerUUID string) error { + if !uuid.IsValid(workerUUID) { + return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") + } + + ctx := e.Request().Context() + logger := requestLogger(e) + logger = logger.With().Str("worker", workerUUID).Logger() + + var req api.SetWorkerSleepScheduleJSONRequestBody + err := e.Bind(&req) + if err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + schedule := api.WorkerSleepSchedule(req) + + dbSchedule := persistence.SleepSchedule{ + IsActive: schedule.IsActive, + DaysOfWeek: schedule.DaysOfWeek, + StartTime: schedule.StartTime, + EndTime: schedule.EndTime, + } + + err = f.persist.SetWorkerSleepSchedule(ctx, workerUUID, dbSchedule) + switch { + case errors.Is(err, persistence.ErrWorkerNotFound): + logger.Warn().Msg("SetWorkerSleepSchedule: worker does not exist") + return sendAPIError(e, http.StatusNotFound, "worker %q does not exist", workerUUID) + case err != nil: + logger.Error().Err(err).Msg("SetWorkerSleepSchedule: error fetching sleep schedule") + return sendAPIError(e, http.StatusInternalServerError, "error fetching sleep schedule: %v", err) + } + + logger.Info().Interface("schedule", schedule).Msg("worker sleep schedule updated") + return e.NoContent(http.StatusNoContent) +} diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index 588fb51b..64f12b0a 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -11,6 +11,7 @@ func (db *DB) migrate() error { &Job{}, &JobBlock{}, &LastRendered{}, + &SleepSchedule{}, &Task{}, &TaskFailure{}, &Worker{}, diff --git a/internal/manager/persistence/worker_sleep_schedule.go b/internal/manager/persistence/worker_sleep_schedule.go new file mode 100644 index 00000000..d3174fa3 --- /dev/null +++ b/internal/manager/persistence/worker_sleep_schedule.go @@ -0,0 +1,70 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog/log" + "gorm.io/gorm/clause" +) + +// SleepSchedule belongs to a Worker, and determines when it's automatically +// sent to the 'asleep' and 'awake' states. +type SleepSchedule struct { + Model + + WorkerID uint `gorm:"default:0;unique;index"` + Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` + + IsActive bool `gorm:"default:false;index"` + + // Space-separated two-letter strings indicating days of week the schedule is + // active ("mo", "tu", etc.). Empty means "every day". + DaysOfWeek string `gorm:"default:''"` + StartTime string `gorm:"default:''"` + EndTime string `gorm:"default:''"` + + NextCheck *time.Time +} + +func (db *DB) FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*SleepSchedule, error) { + logger := log.With().Str("worker", workerUUID).Logger() + logger.Trace().Msg("fetching worker sleep schedule") + + var sched SleepSchedule + tx := db.gormDB.WithContext(ctx). + Joins("inner join workers on workers.id = sleep_schedules.worker_id"). + Where("workers.uuid = ?", workerUUID). + // This is the same as First(&sched), except it doesn't cause an error if it doesn't exist: + Limit(1).Find(&sched) + if tx.Error != nil { + return nil, tx.Error + } + if sched.ID == 0 { + return nil, nil + } + return &sched, nil +} + +func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule SleepSchedule) error { + logger := log.With().Str("worker", workerUUID).Logger() + logger.Trace().Msg("setting worker sleep schedule") + + worker, err := db.FetchWorker(ctx, workerUUID) + if err != nil { + return fmt.Errorf("fetching worker %q: %w", workerUUID, err) + } + schedule.WorkerID = worker.ID + schedule.Worker = worker + + tx := db.gormDB.WithContext(ctx). + Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "worker_id"}}, + UpdateAll: true, + }). + Create(&schedule) + return tx.Error +} diff --git a/internal/manager/persistence/worker_sleep_schedule_test.go b/internal/manager/persistence/worker_sleep_schedule_test.go new file mode 100644 index 00000000..d4180d66 --- /dev/null +++ b/internal/manager/persistence/worker_sleep_schedule_test.go @@ -0,0 +1,135 @@ +package persistence + +import ( + "testing" + "time" + + "git.blender.org/flamenco/internal/uuid" + "git.blender.org/flamenco/pkg/api" + "github.com/stretchr/testify/assert" +) + +func TestFetchWorkerSleepSchedule(t *testing.T) { + ctx, finish, db := persistenceTestFixtures(t, 1*time.Second) + defer finish() + + linuxWorker := Worker{ + UUID: uuid.New(), + Name: "дрон", + Address: "fe80::5054:ff:fede:2ad7", + Platform: "linux", + Software: "3.0", + Status: api.WorkerStatusAwake, + SupportedTaskTypes: "blender,ffmpeg,file-management", + } + err := db.CreateWorker(ctx, &linuxWorker) + if !assert.NoError(t, err) { + t.FailNow() + } + + // Not an existing Worker. + fetched, err := db.FetchWorkerSleepSchedule(ctx, "2cf6153a-3d4e-49f4-a5c0-1c9fc176e155") + assert.NoError(t, err, "non-existent worker should not cause an error") + assert.Nil(t, fetched) + + // No sleep schedule. + fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) + assert.NoError(t, err, "non-existent schedule should not cause an error") + assert.Nil(t, fetched) + + // Create a sleep schedule. + created := SleepSchedule{ + WorkerID: linuxWorker.ID, + Worker: &linuxWorker, + + IsActive: true, + DaysOfWeek: "mo,tu,th,fr", + StartTime: "18:00", + EndTime: "09:00", + } + tx := db.gormDB.Create(&created) + if !assert.NoError(t, tx.Error) { + t.FailNow() + } + + fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) + assert.NoError(t, err) + assertEqualSleepSchedule(t, linuxWorker.ID, created, *fetched) +} + +func TestSetWorkerSleepSchedule(t *testing.T) { + ctx, finish, db := persistenceTestFixtures(t, 1*time.Second) + defer finish() + + linuxWorker := Worker{ + UUID: uuid.New(), + Name: "дрон", + Address: "fe80::5054:ff:fede:2ad7", + Platform: "linux", + Software: "3.0", + Status: api.WorkerStatusAwake, + SupportedTaskTypes: "blender,ffmpeg,file-management", + } + err := db.CreateWorker(ctx, &linuxWorker) + if !assert.NoError(t, err) { + t.FailNow() + } + + schedule := SleepSchedule{ + WorkerID: linuxWorker.ID, + Worker: &linuxWorker, + + IsActive: true, + DaysOfWeek: "mo,tu,th,fr", + StartTime: "18:00", + EndTime: "09:00", + } + + // Not an existing Worker. + err = db.SetWorkerSleepSchedule(ctx, "2cf6153a-3d4e-49f4-a5c0-1c9fc176e155", schedule) + assert.ErrorIs(t, err, ErrWorkerNotFound) + + // Create the sleep schedule. + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, schedule) + assert.NoError(t, err) + fetched, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) + assert.NoError(t, err) + assertEqualSleepSchedule(t, linuxWorker.ID, schedule, *fetched) + + // Overwrite the schedule with one that already has a database ID. + newSchedule := schedule + newSchedule.IsActive = false + newSchedule.DaysOfWeek = "mo,tu,we,th,fr" + newSchedule.StartTime = "02:00" + newSchedule.EndTime = "06:00" + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, newSchedule) + assert.NoError(t, err) + fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) + assert.NoError(t, err) + assertEqualSleepSchedule(t, linuxWorker.ID, newSchedule, *fetched) + + // Overwrite the schedule with a freshly constructed one. + newerSchedule := SleepSchedule{ + WorkerID: linuxWorker.ID, + Worker: &linuxWorker, + + IsActive: true, + DaysOfWeek: "mo", + StartTime: "03:27", + EndTime: "15:47", + } + err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, newerSchedule) + assert.NoError(t, err) + fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) + assert.NoError(t, err) + assertEqualSleepSchedule(t, linuxWorker.ID, newerSchedule, *fetched) +} + +func assertEqualSleepSchedule(t *testing.T, workerID uint, expect, actual SleepSchedule) { + assert.Equal(t, workerID, actual.WorkerID) + assert.Nil(t, actual.Worker, "the Worker itself should not be fetched") + assert.Equal(t, expect.IsActive, actual.IsActive) + assert.Equal(t, expect.DaysOfWeek, actual.DaysOfWeek) + assert.Equal(t, expect.StartTime, actual.StartTime) + assert.Equal(t, expect.EndTime, actual.EndTime) +}