Manager: implement operations for getting & setting worker sleep schedule

This is just the API, no web interface yet.

Manifest Task: T99397
This commit is contained in:
Sybren A. Stüvel 2022-07-16 16:00:25 +02:00
parent 0e92004f2a
commit 627996525e
6 changed files with 317 additions and 0 deletions

View File

@ -65,6 +65,9 @@ type PersistenceService interface {
// CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type. // CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type.
CountTaskFailuresOfWorker(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) (int, error) CountTaskFailuresOfWorker(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) (int, error)
FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*persistence.SleepSchedule, error)
SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule persistence.SleepSchedule) error
// Database queries. // Database queries.
QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error)
QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error)

View File

@ -216,6 +216,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{}
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1)
} }
// FetchWorkerSleepSchedule mocks base method.
func (m *MockPersistenceService) FetchWorkerSleepSchedule(arg0 context.Context, arg1 string) (*persistence.SleepSchedule, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchWorkerSleepSchedule", arg0, arg1)
ret0, _ := ret[0].(*persistence.SleepSchedule)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchWorkerSleepSchedule indicates an expected call of FetchWorkerSleepSchedule.
func (mr *MockPersistenceServiceMockRecorder) FetchWorkerSleepSchedule(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerSleepSchedule), arg0, arg1)
}
// FetchWorkers mocks base method. // FetchWorkers mocks base method.
func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -375,6 +390,20 @@ func (mr *MockPersistenceServiceMockRecorder) SetLastRendered(arg0, arg1 interfa
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastRendered", reflect.TypeOf((*MockPersistenceService)(nil).SetLastRendered), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastRendered", reflect.TypeOf((*MockPersistenceService)(nil).SetLastRendered), arg0, arg1)
} }
// SetWorkerSleepSchedule mocks base method.
func (m *MockPersistenceService) SetWorkerSleepSchedule(arg0 context.Context, arg1 string, arg2 persistence.SleepSchedule) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetWorkerSleepSchedule", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// SetWorkerSleepSchedule indicates an expected call of SetWorkerSleepSchedule.
func (mr *MockPersistenceServiceMockRecorder) SetWorkerSleepSchedule(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWorkerSleepSchedule", reflect.TypeOf((*MockPersistenceService)(nil).SetWorkerSleepSchedule), arg0, arg1, arg2)
}
// StoreAuthoredJob mocks base method. // StoreAuthoredJob mocks base method.
func (m *MockPersistenceService) StoreAuthoredJob(arg0 context.Context, arg1 job_compilers.AuthoredJob) error { func (m *MockPersistenceService) StoreAuthoredJob(arg0 context.Context, arg1 job_compilers.AuthoredJob) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -0,0 +1,79 @@
package api_impl
import (
"errors"
"net/http"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/uuid"
"git.blender.org/flamenco/pkg/api"
"github.com/labstack/echo/v4"
)
func (f *Flamenco) FetchWorkerSleepSchedule(e echo.Context, workerUUID string) error {
if !uuid.IsValid(workerUUID) {
return sendAPIError(e, http.StatusBadRequest, "not a valid UUID")
}
ctx := e.Request().Context()
logger := requestLogger(e)
logger = logger.With().Str("worker", workerUUID).Logger()
schedule, err := f.persist.FetchWorkerSleepSchedule(ctx, workerUUID)
switch {
case errors.Is(err, persistence.ErrWorkerNotFound):
logger.Warn().Msg("FetchWorkerSleepSchedule: worker does not exist")
return sendAPIError(e, http.StatusNotFound, "worker %q does not exist", workerUUID)
case err != nil:
logger.Error().Err(err).Msg("FetchWorkerSleepSchedule: error fetching sleep schedule")
return sendAPIError(e, http.StatusInternalServerError, "error fetching sleep schedule: %v", err)
case schedule == nil:
return e.NoContent(http.StatusNoContent)
}
apiSchedule := api.WorkerSleepSchedule{
DaysOfWeek: schedule.DaysOfWeek,
EndTime: schedule.EndTime,
IsActive: schedule.IsActive,
StartTime: schedule.StartTime,
}
return e.JSON(http.StatusOK, apiSchedule)
}
func (f *Flamenco) SetWorkerSleepSchedule(e echo.Context, workerUUID string) error {
if !uuid.IsValid(workerUUID) {
return sendAPIError(e, http.StatusBadRequest, "not a valid UUID")
}
ctx := e.Request().Context()
logger := requestLogger(e)
logger = logger.With().Str("worker", workerUUID).Logger()
var req api.SetWorkerSleepScheduleJSONRequestBody
err := e.Bind(&req)
if err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
schedule := api.WorkerSleepSchedule(req)
dbSchedule := persistence.SleepSchedule{
IsActive: schedule.IsActive,
DaysOfWeek: schedule.DaysOfWeek,
StartTime: schedule.StartTime,
EndTime: schedule.EndTime,
}
err = f.persist.SetWorkerSleepSchedule(ctx, workerUUID, dbSchedule)
switch {
case errors.Is(err, persistence.ErrWorkerNotFound):
logger.Warn().Msg("SetWorkerSleepSchedule: worker does not exist")
return sendAPIError(e, http.StatusNotFound, "worker %q does not exist", workerUUID)
case err != nil:
logger.Error().Err(err).Msg("SetWorkerSleepSchedule: error fetching sleep schedule")
return sendAPIError(e, http.StatusInternalServerError, "error fetching sleep schedule: %v", err)
}
logger.Info().Interface("schedule", schedule).Msg("worker sleep schedule updated")
return e.NoContent(http.StatusNoContent)
}

View File

@ -11,6 +11,7 @@ func (db *DB) migrate() error {
&Job{}, &Job{},
&JobBlock{}, &JobBlock{},
&LastRendered{}, &LastRendered{},
&SleepSchedule{},
&Task{}, &Task{},
&TaskFailure{}, &TaskFailure{},
&Worker{}, &Worker{},

View File

@ -0,0 +1,70 @@
package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"fmt"
"time"
"github.com/rs/zerolog/log"
"gorm.io/gorm/clause"
)
// SleepSchedule belongs to a Worker, and determines when it's automatically
// sent to the 'asleep' and 'awake' states.
type SleepSchedule struct {
Model
WorkerID uint `gorm:"default:0;unique;index"`
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"`
IsActive bool `gorm:"default:false;index"`
// Space-separated two-letter strings indicating days of week the schedule is
// active ("mo", "tu", etc.). Empty means "every day".
DaysOfWeek string `gorm:"default:''"`
StartTime string `gorm:"default:''"`
EndTime string `gorm:"default:''"`
NextCheck *time.Time
}
func (db *DB) FetchWorkerSleepSchedule(ctx context.Context, workerUUID string) (*SleepSchedule, error) {
logger := log.With().Str("worker", workerUUID).Logger()
logger.Trace().Msg("fetching worker sleep schedule")
var sched SleepSchedule
tx := db.gormDB.WithContext(ctx).
Joins("inner join workers on workers.id = sleep_schedules.worker_id").
Where("workers.uuid = ?", workerUUID).
// This is the same as First(&sched), except it doesn't cause an error if it doesn't exist:
Limit(1).Find(&sched)
if tx.Error != nil {
return nil, tx.Error
}
if sched.ID == 0 {
return nil, nil
}
return &sched, nil
}
func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, schedule SleepSchedule) error {
logger := log.With().Str("worker", workerUUID).Logger()
logger.Trace().Msg("setting worker sleep schedule")
worker, err := db.FetchWorker(ctx, workerUUID)
if err != nil {
return fmt.Errorf("fetching worker %q: %w", workerUUID, err)
}
schedule.WorkerID = worker.ID
schedule.Worker = worker
tx := db.gormDB.WithContext(ctx).
Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "worker_id"}},
UpdateAll: true,
}).
Create(&schedule)
return tx.Error
}

View File

@ -0,0 +1,135 @@
package persistence
import (
"testing"
"time"
"git.blender.org/flamenco/internal/uuid"
"git.blender.org/flamenco/pkg/api"
"github.com/stretchr/testify/assert"
)
func TestFetchWorkerSleepSchedule(t *testing.T) {
ctx, finish, db := persistenceTestFixtures(t, 1*time.Second)
defer finish()
linuxWorker := Worker{
UUID: uuid.New(),
Name: "дрон",
Address: "fe80::5054:ff:fede:2ad7",
Platform: "linux",
Software: "3.0",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management",
}
err := db.CreateWorker(ctx, &linuxWorker)
if !assert.NoError(t, err) {
t.FailNow()
}
// Not an existing Worker.
fetched, err := db.FetchWorkerSleepSchedule(ctx, "2cf6153a-3d4e-49f4-a5c0-1c9fc176e155")
assert.NoError(t, err, "non-existent worker should not cause an error")
assert.Nil(t, fetched)
// No sleep schedule.
fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err, "non-existent schedule should not cause an error")
assert.Nil(t, fetched)
// Create a sleep schedule.
created := SleepSchedule{
WorkerID: linuxWorker.ID,
Worker: &linuxWorker,
IsActive: true,
DaysOfWeek: "mo,tu,th,fr",
StartTime: "18:00",
EndTime: "09:00",
}
tx := db.gormDB.Create(&created)
if !assert.NoError(t, tx.Error) {
t.FailNow()
}
fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err)
assertEqualSleepSchedule(t, linuxWorker.ID, created, *fetched)
}
func TestSetWorkerSleepSchedule(t *testing.T) {
ctx, finish, db := persistenceTestFixtures(t, 1*time.Second)
defer finish()
linuxWorker := Worker{
UUID: uuid.New(),
Name: "дрон",
Address: "fe80::5054:ff:fede:2ad7",
Platform: "linux",
Software: "3.0",
Status: api.WorkerStatusAwake,
SupportedTaskTypes: "blender,ffmpeg,file-management",
}
err := db.CreateWorker(ctx, &linuxWorker)
if !assert.NoError(t, err) {
t.FailNow()
}
schedule := SleepSchedule{
WorkerID: linuxWorker.ID,
Worker: &linuxWorker,
IsActive: true,
DaysOfWeek: "mo,tu,th,fr",
StartTime: "18:00",
EndTime: "09:00",
}
// Not an existing Worker.
err = db.SetWorkerSleepSchedule(ctx, "2cf6153a-3d4e-49f4-a5c0-1c9fc176e155", schedule)
assert.ErrorIs(t, err, ErrWorkerNotFound)
// Create the sleep schedule.
err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, schedule)
assert.NoError(t, err)
fetched, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err)
assertEqualSleepSchedule(t, linuxWorker.ID, schedule, *fetched)
// Overwrite the schedule with one that already has a database ID.
newSchedule := schedule
newSchedule.IsActive = false
newSchedule.DaysOfWeek = "mo,tu,we,th,fr"
newSchedule.StartTime = "02:00"
newSchedule.EndTime = "06:00"
err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, newSchedule)
assert.NoError(t, err)
fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err)
assertEqualSleepSchedule(t, linuxWorker.ID, newSchedule, *fetched)
// Overwrite the schedule with a freshly constructed one.
newerSchedule := SleepSchedule{
WorkerID: linuxWorker.ID,
Worker: &linuxWorker,
IsActive: true,
DaysOfWeek: "mo",
StartTime: "03:27",
EndTime: "15:47",
}
err = db.SetWorkerSleepSchedule(ctx, linuxWorker.UUID, newerSchedule)
assert.NoError(t, err)
fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID)
assert.NoError(t, err)
assertEqualSleepSchedule(t, linuxWorker.ID, newerSchedule, *fetched)
}
func assertEqualSleepSchedule(t *testing.T, workerID uint, expect, actual SleepSchedule) {
assert.Equal(t, workerID, actual.WorkerID)
assert.Nil(t, actual.Worker, "the Worker itself should not be fetched")
assert.Equal(t, expect.IsActive, actual.IsActive)
assert.Equal(t, expect.DaysOfWeek, actual.DaysOfWeek)
assert.Equal(t, expect.StartTime, actual.StartTime)
assert.Equal(t, expect.EndTime, actual.EndTime)
}