diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 5e428bce..3404afab 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -76,6 +76,7 @@ type PersistenceService interface { FetchWorkerTags(ctx context.Context) ([]*persistence.WorkerTag, error) DeleteWorkerTag(ctx context.Context, uuid string) error SaveWorkerTag(ctx context.Context, tag *persistence.WorkerTag) error + FetchTagsOfWorker(ctx context.Context, workerUUID string) ([]persistence.WorkerTag, error) // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index a5934474..6cca0d26 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -44,7 +44,7 @@ func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder { } // AddWorkerToJobBlocklist mocks base method. -func (m *MockPersistenceService) AddWorkerToJobBlocklist(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) error { +func (m *MockPersistenceService) AddWorkerToJobBlocklist(arg0 context.Context, arg1 *persistence.Job, arg2 *sqlc.Worker, arg3 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddWorkerToJobBlocklist", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) @@ -58,7 +58,7 @@ func (mr *MockPersistenceServiceMockRecorder) AddWorkerToJobBlocklist(arg0, arg1 } // AddWorkerToTaskFailedList mocks base method. -func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *persistence.Worker) (int, error) { +func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *sqlc.Worker) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddWorkerToTaskFailedList", arg0, arg1, arg2) ret0, _ := ret[0].(int) @@ -115,7 +115,7 @@ func (mr *MockPersistenceServiceMockRecorder) ClearJobBlocklist(arg0, arg1 inter } // CountTaskFailuresOfWorker mocks base method. -func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) (int, error) { +func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *sqlc.Worker, arg3 string) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CountTaskFailuresOfWorker", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(int) @@ -130,7 +130,7 @@ func (mr *MockPersistenceServiceMockRecorder) CountTaskFailuresOfWorker(arg0, ar } // CreateWorker mocks base method. -func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateWorker", arg0, arg1) ret0, _ := ret[0].(error) @@ -230,6 +230,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobs(arg0 interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobs", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobs), arg0) } +// FetchTagsOfWorker mocks base method. +func (m *MockPersistenceService) FetchTagsOfWorker(arg0 context.Context, arg1 string) ([]persistence.WorkerTag, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchTagsOfWorker", arg0, arg1) + ret0, _ := ret[0].([]persistence.WorkerTag) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchTagsOfWorker indicates an expected call of FetchTagsOfWorker. +func (mr *MockPersistenceServiceMockRecorder) FetchTagsOfWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTagsOfWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchTagsOfWorker), arg0, arg1) +} + // FetchTask mocks base method. func (m *MockPersistenceService) FetchTask(arg0 context.Context, arg1 string) (*persistence.Task, error) { m.ctrl.T.Helper() @@ -246,10 +261,10 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTask(arg0, arg1 interface{}) } // FetchTaskFailureList mocks base method. -func (m *MockPersistenceService) FetchTaskFailureList(arg0 context.Context, arg1 *persistence.Task) ([]*persistence.Worker, error) { +func (m *MockPersistenceService) FetchTaskFailureList(arg0 context.Context, arg1 *persistence.Task) ([]*sqlc.Worker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchTaskFailureList", arg0, arg1) - ret0, _ := ret[0].([]*persistence.Worker) + ret0, _ := ret[0].([]*sqlc.Worker) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -276,10 +291,10 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTaskJobUUID(arg0, arg1 interf } // FetchWorker mocks base method. -func (m *MockPersistenceService) FetchWorker(arg0 context.Context, arg1 string) (*persistence.Worker, error) { +func (m *MockPersistenceService) FetchWorker(arg0 context.Context, arg1 string) (*sqlc.Worker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchWorker", arg0, arg1) - ret0, _ := ret[0].(*persistence.Worker) + ret0, _ := ret[0].(*sqlc.Worker) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -321,7 +336,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTags(arg0 interface{}) } // FetchWorkerTask mocks base method. -func (m *MockPersistenceService) FetchWorkerTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) { +func (m *MockPersistenceService) FetchWorkerTask(arg0 context.Context, arg1 *sqlc.Worker) (*persistence.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchWorkerTask", arg0, arg1) ret0, _ := ret[0].(*persistence.Task) @@ -336,10 +351,10 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTask(arg0, arg1 interfa } // FetchWorkers mocks base method. -func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { +func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*sqlc.Worker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchWorkers", arg0) - ret0, _ := ret[0].([]*persistence.Worker) + ret0, _ := ret[0].([]*sqlc.Worker) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -423,7 +438,7 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTaskActivity(arg0, arg1 interf } // SaveWorker mocks base method. -func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveWorker", arg0, arg1) ret0, _ := ret[0].(error) @@ -437,7 +452,7 @@ func (mr *MockPersistenceServiceMockRecorder) SaveWorker(arg0, arg1 interface{}) } // SaveWorkerStatus mocks base method. -func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveWorkerStatus", arg0, arg1) ret0, _ := ret[0].(error) @@ -465,7 +480,7 @@ func (mr *MockPersistenceServiceMockRecorder) SaveWorkerTag(arg0, arg1 interface } // ScheduleTask mocks base method. -func (m *MockPersistenceService) ScheduleTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) { +func (m *MockPersistenceService) ScheduleTask(arg0 context.Context, arg1 *sqlc.Worker) (*persistence.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ScheduleTask", arg0, arg1) ret0, _ := ret[0].(*persistence.Task) @@ -522,7 +537,7 @@ func (mr *MockPersistenceServiceMockRecorder) TaskTouchedByWorker(arg0, arg1 int } // WorkerSeen mocks base method. -func (m *MockPersistenceService) WorkerSeen(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) WorkerSeen(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WorkerSeen", arg0, arg1) ret0, _ := ret[0].(error) @@ -536,7 +551,7 @@ func (mr *MockPersistenceServiceMockRecorder) WorkerSeen(arg0, arg1 interface{}) } // WorkerSetTags mocks base method. -func (m *MockPersistenceService) WorkerSetTags(arg0 context.Context, arg1 *persistence.Worker, arg2 []string) error { +func (m *MockPersistenceService) WorkerSetTags(arg0 context.Context, arg1 *sqlc.Worker, arg2 []string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WorkerSetTags", arg0, arg1, arg2) ret0, _ := ret[0].(error) @@ -1017,7 +1032,7 @@ func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2, ar } // RequeueActiveTasksOfWorker mocks base method. -func (m *MockTaskStateMachine) RequeueActiveTasksOfWorker(arg0 context.Context, arg1 *persistence.Worker, arg2 string) error { +func (m *MockTaskStateMachine) RequeueActiveTasksOfWorker(arg0 context.Context, arg1 *sqlc.Worker, arg2 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RequeueActiveTasksOfWorker", arg0, arg1, arg2) ret0, _ := ret[0].(error) @@ -1031,7 +1046,7 @@ func (mr *MockTaskStateMachineMockRecorder) RequeueActiveTasksOfWorker(arg0, arg } // RequeueFailedTasksOfWorkerOfJob mocks base method. -func (m *MockTaskStateMachine) RequeueFailedTasksOfWorkerOfJob(arg0 context.Context, arg1 *persistence.Worker, arg2 *persistence.Job, arg3 string) error { +func (m *MockTaskStateMachine) RequeueFailedTasksOfWorkerOfJob(arg0 context.Context, arg1 *sqlc.Worker, arg2 *persistence.Job, arg3 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RequeueFailedTasksOfWorkerOfJob", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index 23c46553..56592989 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -236,7 +236,7 @@ func assertResponseNoBody(t *testing.T, echoCtx echo.Context, expectStatus int) func testWorker() persistence.Worker { return persistence.Worker{ - Model: persistence.Model{ID: 1}, + ID: 1, UUID: "e7632d62-c3b8-4af0-9e78-01752928952c", Name: "дрон", Address: "fe80::5054:ff:fede:2ad7", diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index 564a4a9b..de185aac 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -57,6 +57,15 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err) } + workerTags, err := f.persist.FetchTagsOfWorker(ctx, workerUUID) + switch { + case errors.Is(err, context.Canceled): + return handleConnectionClosed(e, logger, "fetching worker tags") + case err != nil: + logger.Error().AnErr("cause", err).Msg("fetching worker tags") + return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tags: %v", err) + } + dbTask, err := f.persist.FetchWorkerTask(ctx, dbWorker) switch { case errors.Is(err, context.Canceled): @@ -77,6 +86,20 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { apiWorker.Task = &apiWorkerTask } + // TODO: see which API calls actually need the worker tags, and whether it's + // better to just call a specific API operation in those cases. + if len(workerTags) > 0 { + apiTags := make([]api.WorkerTag, len(workerTags)) + for index, tag := range workerTags { + apiTags[index].Id = &tag.UUID + apiTags[index].Name = tag.Name + if tag.Description != "" { + apiTags[index].Description = &tag.Description + } + } + apiWorker.Tags = &apiTags + } + return e.JSON(http.StatusOK, apiWorker) } @@ -464,8 +487,8 @@ func workerSummary(w persistence.Worker) api.WorkerSummary { } } - if !w.LastSeenAt.IsZero() { - summary.LastSeen = &w.LastSeenAt + if w.LastSeenAt.Valid { + summary.LastSeen = &w.LastSeenAt.Time } return summary @@ -478,15 +501,6 @@ func workerDBtoAPI(w persistence.Worker) api.Worker { Platform: w.Platform, SupportedTaskTypes: w.TaskTypes(), } - - if len(w.Tags) > 0 { - tags := []api.WorkerTag{} - for i := range w.Tags { - tags = append(tags, *workerTagDBtoAPI(w.Tags[i])) - } - apiWorker.Tags = &tags - } - return apiWorker } diff --git a/internal/manager/api_impl/worker_mgt_test.go b/internal/manager/api_impl/worker_mgt_test.go index 62942ee3..366f7d9f 100644 --- a/internal/manager/api_impl/worker_mgt_test.go +++ b/internal/manager/api_impl/worker_mgt_test.go @@ -85,6 +85,15 @@ func TestFetchWorker(t *testing.T) { require.NoError(t, err) assertResponseAPIError(t, echo, http.StatusInternalServerError, "error fetching worker: some unknown error") + // Test database error fetching worker tags. + mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil) + mf.persistence.EXPECT().FetchTagsOfWorker(gomock.Any(), workerUUID). + Return(nil, errors.New("some tag fetching error")) + echo = mf.prepareMockedRequest(nil) + err = mf.flamenco.FetchWorker(echo, workerUUID) + require.NoError(t, err) + assertResponseAPIError(t, echo, http.StatusInternalServerError, "error fetching worker tags: some tag fetching error") + // Test with worker that does NOT have a status change requested, and DOES have an assigned task. mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil) assignedTask := persistence.Task{ @@ -93,6 +102,10 @@ func TestFetchWorker(t *testing.T) { Job: &persistence.Job{UUID: "f0e25ee4-0d13-4291-afc3-e9446b555aaf"}, Status: api.TaskStatusActive, } + mf.persistence.EXPECT().FetchTagsOfWorker(gomock.Any(), workerUUID).Return([]persistence.WorkerTag{ + {UUID: "0e701402-c4cc-49b0-8b8c-3eb8718d463a", Name: "EEVEE"}, + {UUID: "59211f0a-81cc-4148-b0b7-32b3e2dcdb8f", Name: "Cycles"}, + }, nil) mf.persistence.EXPECT().FetchWorkerTask(gomock.Any(), &worker).Return(&assignedTask, nil) echo = mf.prepareMockedRequest(nil) @@ -116,12 +129,17 @@ func TestFetchWorker(t *testing.T) { }, JobId: assignedTask.Job.UUID, }, + Tags: &[]api.WorkerTag{ + {Id: ptr("0e701402-c4cc-49b0-8b8c-3eb8718d463a"), Name: "EEVEE"}, + {Id: ptr("59211f0a-81cc-4148-b0b7-32b3e2dcdb8f"), Name: "Cycles"}, + }, }) // Test with worker that does have a status change requested, but does NOT Have an assigned task. requestedStatus := api.WorkerStatusAsleep worker.StatusChangeRequest(requestedStatus, false) mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil) + mf.persistence.EXPECT().FetchTagsOfWorker(gomock.Any(), workerUUID).Return([]persistence.WorkerTag{}, nil) mf.persistence.EXPECT().FetchWorkerTask(gomock.Any(), &worker).Return(nil, nil) echo = mf.prepareMockedRequest(nil) @@ -170,7 +188,7 @@ func TestDeleteWorker(t *testing.T) { Id: worker.UUID, Name: worker.Name, Status: worker.Status, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -201,7 +219,7 @@ func TestRequestWorkerStatusChange(t *testing.T) { Id: worker.UUID, Name: worker.Name, Status: prevStatus, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, StatusChange: &api.WorkerStatusChangeRequest{ Status: requestStatus, @@ -245,7 +263,7 @@ func TestRequestWorkerStatusChangeRevert(t *testing.T) { Id: worker.UUID, Name: worker.Name, Status: currentStatus, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, StatusChange: nil, }) diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go index 0f7d5c05..7d132386 100644 --- a/internal/manager/api_impl/worker_task_updates.go +++ b/internal/manager/api_impl/worker_task_updates.go @@ -52,7 +52,7 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { Msg("worker trying to update task that's not assigned to any worker") return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) } - if *dbTask.WorkerID != worker.ID { + if *dbTask.WorkerID != uint(worker.ID) { logger.Warn().Msg("worker trying to update task that's assigned to another worker") return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) } diff --git a/internal/manager/api_impl/worker_task_updates_test.go b/internal/manager/api_impl/worker_task_updates_test.go index 6810bc7a..0cc6fd2d 100644 --- a/internal/manager/api_impl/worker_task_updates_test.go +++ b/internal/manager/api_impl/worker_task_updates_test.go @@ -36,7 +36,7 @@ func TestTaskUpdate(t *testing.T) { mockTask := persistence.Task{ UUID: taskID, Worker: &worker, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Job: &mockJob, Activity: "pre-update activity", } @@ -105,7 +105,7 @@ func TestTaskUpdateFailed(t *testing.T) { mockTask := persistence.Task{ UUID: taskID, Worker: &worker, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Job: &mockJob, Activity: "pre-update activity", Type: "misc", @@ -189,7 +189,7 @@ func TestBlockingAfterFailure(t *testing.T) { mockTask := persistence.Task{ UUID: taskID, Worker: &worker, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Job: &mockJob, Activity: "pre-update activity", Type: "misc", @@ -339,7 +339,7 @@ func TestJobFailureAfterWorkerTaskFailure(t *testing.T) { mockTask := persistence.Task{ UUID: taskID, Worker: &worker, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Job: &mockJob, Activity: "pre-update activity", Type: "misc", diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 8e77545a..1feaae1e 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -582,7 +582,7 @@ func mayWorkerRun(worker *persistence.Worker, dbTask *persistence.Task) api.MayK StatusChangeRequested: true, } } - if dbTask.WorkerID == nil || *dbTask.WorkerID != worker.ID { + if dbTask.WorkerID == nil || *dbTask.WorkerID != uint(worker.ID) { return api.MayKeepRunning{Reason: "task not assigned to this worker"} } if !task_state_machine.IsRunnableTaskStatus(dbTask.Status) { diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 6340441a..cee31635 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -193,7 +193,7 @@ func TestWorkerSignOn(t *testing.T) { Name: "Lazy Boi", PreviousStatus: &prevStatus, Status: api.WorkerStatusStarting, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: "3.0-testing", }) @@ -249,7 +249,7 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { IsLazy: false, Status: api.WorkerStatusAwake, }, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -278,7 +278,7 @@ func TestWorkerRememberPreviousStatus(t *testing.T) { IsLazy: false, Status: api.WorkerStatusAwake, }, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -316,7 +316,7 @@ func TestWorkerDontRememberPreviousStatus(t *testing.T) { PreviousStatus: ptr(api.WorkerStatusError), Status: api.WorkerStatusOffline, StatusChange: nil, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -383,7 +383,7 @@ func TestWorkerStateChanged(t *testing.T) { Name: worker.Name, PreviousStatus: &prevStatus, Status: api.WorkerStatusAsleep, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -423,7 +423,7 @@ func TestWorkerStateChangedAfterChangeRequest(t *testing.T) { Name: worker.Name, PreviousStatus: ptr(api.WorkerStatusOffline), Status: api.WorkerStatusStarting, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, StatusChange: &api.WorkerStatusChangeRequest{ Status: api.WorkerStatusAsleep, @@ -456,7 +456,7 @@ func TestWorkerStateChangedAfterChangeRequest(t *testing.T) { Name: worker.Name, PreviousStatus: ptr(api.WorkerStatusStarting), Status: api.WorkerStatusAsleep, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -526,7 +526,7 @@ func TestMayWorkerRun(t *testing.T) { mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &task).Return(nil) echo := prepareRequest() - task.WorkerID = &worker.ID + task.WorkerID = ptr(uint(worker.ID)) err := mf.flamenco.MayWorkerRun(echo, task.UUID) require.NoError(t, err) assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{ @@ -537,7 +537,7 @@ func TestMayWorkerRun(t *testing.T) { // Test: unhappy, assigned but cancelled. { echo := prepareRequest() - task.WorkerID = &worker.ID + task.WorkerID = ptr(uint(worker.ID)) task.Status = api.TaskStatusCanceled err := mf.flamenco.MayWorkerRun(echo, task.UUID) require.NoError(t, err) @@ -551,7 +551,7 @@ func TestMayWorkerRun(t *testing.T) { { worker.StatusChangeRequest(api.WorkerStatusAsleep, false) echo := prepareRequest() - task.WorkerID = &worker.ID + task.WorkerID = ptr(uint(worker.ID)) task.Status = api.TaskStatusActive err := mf.flamenco.MayWorkerRun(echo, task.UUID) require.NoError(t, err) @@ -569,7 +569,7 @@ func TestMayWorkerRun(t *testing.T) { worker.StatusChangeRequest(api.WorkerStatusAsleep, true) echo := prepareRequest() - task.WorkerID = &worker.ID + task.WorkerID = ptr(uint(worker.ID)) task.Status = api.TaskStatusActive err := mf.flamenco.MayWorkerRun(echo, task.UUID) require.NoError(t, err) diff --git a/internal/manager/eventbus/events_workers.go b/internal/manager/eventbus/events_workers.go index 94fd736a..f36a0a0c 100644 --- a/internal/manager/eventbus/events_workers.go +++ b/internal/manager/eventbus/events_workers.go @@ -18,7 +18,7 @@ func NewWorkerUpdate(worker *persistence.Worker) api.EventWorkerUpdate { Name: worker.Name, Status: worker.Status, Version: worker.Software, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, CanRestart: worker.CanRestart, } @@ -29,8 +29,8 @@ func NewWorkerUpdate(worker *persistence.Worker) api.EventWorkerUpdate { } } - if !worker.LastSeenAt.IsZero() { - workerUpdate.LastSeen = &worker.LastSeenAt + if worker.LastSeenAt.Valid { + workerUpdate.LastSeen = &worker.LastSeenAt.Time } // TODO: add tag IDs. diff --git a/internal/manager/persistence/conversion.go b/internal/manager/persistence/conversion.go new file mode 100644 index 00000000..4f1e82ee --- /dev/null +++ b/internal/manager/persistence/conversion.go @@ -0,0 +1,16 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import "database/sql" + +func nullTimeToUTC(t sql.NullTime) sql.NullTime { + return sql.NullTime{ + Time: t.Time.UTC(), + Valid: t.Valid, + } +} + +func ptr[T any](value T) *T { + return &value +} diff --git a/internal/manager/persistence/conversion_test.go b/internal/manager/persistence/conversion_test.go new file mode 100644 index 00000000..e35a79d3 --- /dev/null +++ b/internal/manager/persistence/conversion_test.go @@ -0,0 +1,44 @@ +package persistence + +import ( + "database/sql" + "reflect" + "testing" +) + +func Test_nullTimeToUTC(t *testing.T) { + + inUTC := mustParseTime("2024-11-11T20:12:47Z") + inBangkok := mustParseTime("2024-11-12T03:12:47+07:00") + + tests := []struct { + name string + arg sql.NullTime + want sql.NullTime + }{ + {"zero", sql.NullTime{}, sql.NullTime{}}, + {"invalid-nonzero-utc", + sql.NullTime{Time: inUTC, Valid: false}, + sql.NullTime{Time: inUTC, Valid: false}, + }, + {"valid-nonzero-utc", + sql.NullTime{Time: inUTC, Valid: true}, + sql.NullTime{Time: inUTC, Valid: true}, + }, + {"invalid-nonzero-bangkok", + sql.NullTime{Time: inBangkok, Valid: false}, + sql.NullTime{Time: inUTC, Valid: false}, + }, + {"valid-nonzero-bangkok", + sql.NullTime{Time: inBangkok, Valid: true}, + sql.NullTime{Time: inUTC, Valid: true}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := nullTimeToUTC(tt.arg); !reflect.DeepEqual(got, tt.want) { + t.Errorf("nullTimeToUTC() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/manager/persistence/errors.go b/internal/manager/persistence/errors.go index ff94a5c4..2758f1eb 100644 --- a/internal/manager/persistence/errors.go +++ b/internal/manager/persistence/errors.go @@ -37,19 +37,19 @@ func (e PersistenceError) Is(err error) bool { } func jobError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormJobError(errorToWrap), message, msgArgs...) + return wrapError(translateJobError(errorToWrap), message, msgArgs...) } func taskError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormTaskError(errorToWrap), message, msgArgs...) + return wrapError(translateTaskError(errorToWrap), message, msgArgs...) } func workerError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormWorkerError(errorToWrap), message, msgArgs...) + return wrapError(translateWorkerError(errorToWrap), message, msgArgs...) } func workerTagError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormWorkerTagError(errorToWrap), message, msgArgs...) + return wrapError(translateWorkerTagError(errorToWrap), message, msgArgs...) } func wrapError(errorToWrap error, message string, format ...interface{}) error { @@ -73,36 +73,36 @@ func wrapError(errorToWrap error, message string, format ...interface{}) error { } } -// translateGormJobError translates a Gorm error to a persistence layer error. +// translateJobError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormJobError(err error) error { +func translateJobError(err error) error { if errors.Is(err, sql.ErrNoRows) { return ErrJobNotFound } return err } -// translateGormTaskError translates a Gorm error to a persistence layer error. +// translateTaskError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormTaskError(err error) error { +func translateTaskError(err error) error { if errors.Is(err, sql.ErrNoRows) { return ErrTaskNotFound } return err } -// translateGormWorkerError translates a Gorm error to a persistence layer error. +// translateWorkerError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerError(err error) error { +func translateWorkerError(err error) error { if errors.Is(err, sql.ErrNoRows) { return ErrWorkerNotFound } return err } -// translateGormWorkerTagError translates a Gorm error to a persistence layer error. +// translateWorkerTagError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerTagError(err error) error { +func translateWorkerTagError(err error) error { if errors.Is(err, sql.ErrNoRows) { return ErrWorkerTagNotFound } diff --git a/internal/manager/persistence/errors_test.go b/internal/manager/persistence/errors_test.go index 3a6c3c69..9be7d537 100644 --- a/internal/manager/persistence/errors_test.go +++ b/internal/manager/persistence/errors_test.go @@ -17,18 +17,18 @@ func TestNotFoundErrors(t *testing.T) { assert.Contains(t, ErrTaskNotFound.Error(), "task") } -func TestTranslateGormJobError(t *testing.T) { - assert.Nil(t, translateGormJobError(nil)) - assert.Equal(t, ErrJobNotFound, translateGormJobError(sql.ErrNoRows)) +func TestTranslateJobError(t *testing.T) { + assert.Nil(t, translateJobError(nil)) + assert.Equal(t, ErrJobNotFound, translateJobError(sql.ErrNoRows)) otherError := errors.New("this error is not special for this function") - assert.Equal(t, otherError, translateGormJobError(otherError)) + assert.Equal(t, otherError, translateJobError(otherError)) } -func TestTranslateGormTaskError(t *testing.T) { - assert.Nil(t, translateGormTaskError(nil)) - assert.Equal(t, ErrTaskNotFound, translateGormTaskError(sql.ErrNoRows)) +func TestTranslateTaskError(t *testing.T) { + assert.Nil(t, translateTaskError(nil)) + assert.Equal(t, ErrTaskNotFound, translateTaskError(sql.ErrNoRows)) otherError := errors.New("this error is not special for this function") - assert.Equal(t, otherError, translateGormTaskError(otherError)) + assert.Equal(t, otherError, translateTaskError(otherError)) } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index c8eb9e6f..262bab7c 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -161,7 +161,7 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au Name: authoredJob.Name, JobType: authoredJob.JobType, Priority: int64(authoredJob.Priority), - Status: string(authoredJob.Status), + Status: authoredJob.Status, Settings: settings, Metadata: metadata, StorageShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID, @@ -182,7 +182,7 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au Str("job", params.UUID). Str("type", params.JobType). Str("name", params.Name). - Str("status", params.Status). + Str("status", string(params.Status)). Msg("persistence: storing authored job") jobID, err := qtx.queries.CreateJob(ctx, params) @@ -261,7 +261,7 @@ func (db *DB) storeAuthoredJobTaks( JobID: jobID, IndexInJob: int64(taskIndex + 1), // indexInJob is base-1. Priority: int64(authoredTask.Priority), - Status: string(api.TaskStatusQueued), + Status: api.TaskStatusQueued, Commands: commandsJSON, // dependencies are stored below. } @@ -492,12 +492,7 @@ func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) { queries := db.queries() - statuses := []string{} - for _, status := range jobStatuses { - statuses = append(statuses, string(status)) - } - - sqlcJobs, err := queries.FetchJobsInStatus(ctx, statuses) + sqlcJobs, err := queries.FetchJobsInStatus(ctx, jobStatuses) if err != nil { return nil, jobError(err, "fetching jobs in status %q", jobStatuses) } @@ -521,7 +516,7 @@ func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { params := sqlc.SaveJobStatusParams{ Now: db.nowNullable(), ID: int64(j.ID), - Status: string(j.Status), + Status: j.Status, Activity: j.Activity, } @@ -586,8 +581,9 @@ func convertSqlTaskWithJobAndWorker( task sqlc.Task, ) (*Task, error) { var ( - gormJob Job - gormWorker Worker + gormJob Job + worker Worker + err error ) // Fetch & convert the Job. @@ -603,17 +599,16 @@ func convertSqlTaskWithJobAndWorker( } } - // Fetch & convert the Worker. + // Fetch the Worker. if task.WorkerID.Valid && task.WorkerID.Int64 > 0 { - sqlcWorker, err := queries.FetchWorkerUnconditionalByID(ctx, task.WorkerID.Int64) + worker, err = queries.FetchWorkerUnconditionalByID(ctx, task.WorkerID.Int64) if err != nil { return nil, taskError(err, "fetching worker assigned to task %s", task.UUID) } - gormWorker = *convertSqlcWorker(sqlcWorker) } // Convert the Task. - gormTask, err := convertSqlcTask(task, gormJob.UUID, gormWorker.UUID) + gormTask, err := convertSqlcTask(task, gormJob.UUID, worker.UUID) if err != nil { return nil, err } @@ -623,9 +618,9 @@ func convertSqlTaskWithJobAndWorker( gormTask.Job = &gormJob gormTask.JobUUID = gormJob.UUID } - if gormWorker.ID > 0 { - gormTask.Worker = &gormWorker - gormTask.WorkerUUID = gormWorker.UUID + if worker.ID > 0 { + gormTask.Worker = &worker + gormTask.WorkerUUID = worker.UUID } return gormTask, nil @@ -664,7 +659,7 @@ func (db *DB) SaveTask(ctx context.Context, t *Task) error { Name: t.Name, Type: t.Type, Priority: int64(t.Priority), - Status: string(t.Status), + Status: t.Status, Commands: commandsJSON, Activity: t.Activity, ID: int64(t.ID), @@ -700,7 +695,7 @@ func (db *DB) SaveTaskStatus(ctx context.Context, t *Task) error { err := queries.UpdateTaskStatus(ctx, sqlc.UpdateTaskStatusParams{ UpdatedAt: db.nowNullable(), - Status: string(t.Status), + Status: t.Status, ID: int64(t.ID), }) if err != nil { @@ -743,7 +738,7 @@ func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error // Update the task itself. t.Worker = w - t.WorkerID = &w.ID + t.WorkerID = ptr(uint(w.ID)) return nil } @@ -756,7 +751,7 @@ func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, ta Int64: int64(worker.ID), Valid: true, }, - TaskStatus: string(taskStatus), + TaskStatus: taskStatus, }) if err != nil { return nil, taskError(err, "finding tasks of worker %s in status %q", worker.UUID, taskStatus) @@ -772,7 +767,7 @@ func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, ta return nil, err } gormTask.Worker = worker - gormTask.WorkerID = &worker.ID + gormTask.WorkerID = ptr(uint(worker.ID)) // Fetch the job, either from the cache or from the database. This is done // here because the task_state_machine functionality expects that task.Job @@ -802,7 +797,7 @@ func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worke Valid: true, }, JobID: int64(job.ID), - TaskStatus: string(taskStatus), + TaskStatus: taskStatus, }) if err != nil { return nil, taskError(err, "finding tasks of worker %s in status %q and job %s", worker.UUID, taskStatus, job.UUID) @@ -817,7 +812,7 @@ func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worke gormTask.Job = job gormTask.JobID = job.ID gormTask.Worker = worker - gormTask.WorkerID = &worker.ID + gormTask.WorkerID = ptr(uint(worker.ID)) result[i] = gormTask } return result, nil @@ -828,7 +823,7 @@ func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api. count, err := queries.JobCountTasksInStatus(ctx, sqlc.JobCountTasksInStatusParams{ JobID: int64(job.ID), - TaskStatus: string(taskStatus), + TaskStatus: taskStatus, }) if err != nil { return false, taskError(err, "counting tasks of job %s in status %q", job.UUID, taskStatus) @@ -896,7 +891,7 @@ func (db *DB) FetchTasksOfJobInStatus(ctx context.Context, job *Job, taskStatuse rows, err := queries.FetchTasksOfJobInStatus(ctx, sqlc.FetchTasksOfJobInStatusParams{ JobID: int64(job.ID), - TaskStatus: convertTaskStatuses(taskStatuses), + TaskStatus: taskStatuses, }) if err != nil { return nil, taskError(err, "fetching tasks of job %s in status %q", job.UUID, taskStatuses) @@ -926,7 +921,7 @@ func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job, err := queries.UpdateJobsTaskStatuses(ctx, sqlc.UpdateJobsTaskStatusesParams{ UpdatedAt: db.nowNullable(), - Status: string(taskStatus), + Status: taskStatus, Activity: activity, JobID: int64(job.ID), }) @@ -950,10 +945,10 @@ func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job, err := queries.UpdateJobsTaskStatusesConditional(ctx, sqlc.UpdateJobsTaskStatusesConditionalParams{ UpdatedAt: db.nowNullable(), - Status: string(taskStatus), + Status: taskStatus, Activity: activity, JobID: int64(job.ID), - StatusesToUpdate: convertTaskStatuses(statusesToUpdate), + StatusesToUpdate: statusesToUpdate, }) if err != nil { @@ -1041,7 +1036,7 @@ func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, err workers := make([]*Worker, len(failureList)) for idx := range failureList { - workers[idx] = convertSqlcWorker(failureList[idx].Worker) + workers[idx] = &failureList[idx].Worker } return workers, nil } @@ -1124,21 +1119,3 @@ func convertSqlcTask(task sqlc.Task, jobUUID string, workerUUID string) (*Task, return &dbTask, nil } - -// convertTaskStatuses converts from []api.TaskStatus to []string for feeding to sqlc. -func convertTaskStatuses(taskStatuses []api.TaskStatus) []string { - statusesAsStrings := make([]string, len(taskStatuses)) - for index := range taskStatuses { - statusesAsStrings[index] = string(taskStatuses[index]) - } - return statusesAsStrings -} - -// convertJobStatuses converts from []api.JobStatus to []string for feeding to sqlc. -func convertJobStatuses(jobStatuses []api.JobStatus) []string { - statusesAsStrings := make([]string, len(jobStatuses)) - for index := range jobStatuses { - statusesAsStrings[index] = string(jobStatuses[index]) - } - return statusesAsStrings -} diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go index bf30d17d..35985933 100644 --- a/internal/manager/persistence/jobs_blocklist_test.go +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -119,7 +119,9 @@ func TestWorkersLeftToRun(t *testing.T) { require.NoError(t, err) assert.Empty(t, left) - worker1 := createWorker(ctx, t, db) + worker1 := createWorker(ctx, t, db, func(w *Worker) { + w.UUID = "11111111-0000-1111-2222-333333333333" + }) worker2 := createWorkerFrom(ctx, t, db, *worker1) // Create one worker tag. It will not be used by this job, but one of the @@ -129,8 +131,8 @@ func TestWorkersLeftToRun(t *testing.T) { require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) workerC1 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c1c1c1c1-0000-1111-2222-333333333333" - w.Tags = []*WorkerTag{&tag1} }) + require.NoError(t, db.WorkerSetTags(ctx, workerC1, []string{tag1.UUID})) uuidMap := func(workers ...*Worker) map[string]bool { theMap := map[string]bool{} @@ -185,23 +187,25 @@ func TestWorkersLeftToRunWithTags(t *testing.T) { // Tags 1 + 3 workerC13 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c13c1313-0000-1111-2222-333333333333" - w.Tags = []*WorkerTag{&tag1, &tag3} }) + require.NoError(t, db.WorkerSetTags(ctx, workerC13, []string{tag1.UUID, tag3.UUID})) + // Tag 1 workerC1 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c1c1c1c1-0000-1111-2222-333333333333" - w.Tags = []*WorkerTag{&tag1} }) + require.NoError(t, db.WorkerSetTags(ctx, workerC1, []string{tag1.UUID})) + // Tag 2 worker, this one should never appear. - createWorker(ctx, t, db, func(w *Worker) { + workerC2 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c2c2c2c2-0000-1111-2222-333333333333" - w.Tags = []*WorkerTag{&tag2} }) + require.NoError(t, db.WorkerSetTags(ctx, workerC2, []string{tag2.UUID})) + // No tags, so should be able to run only tagless jobs. Which is none // in this test. createWorker(ctx, t, db, func(w *Worker) { w.UUID = "00000000-0000-1111-2222-333333333333" - w.Tags = nil }) uuidMap := func(workers ...*Worker) map[string]bool { diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index e7d0d9ba..06779b95 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -14,6 +14,7 @@ import ( "golang.org/x/net/context" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -581,7 +582,7 @@ func TestTaskAssignToWorker(t *testing.T) { if task.WorkerID == nil { t.Error("task.WorkerID == nil") } else { - assert.Equal(t, w.ID, *task.WorkerID) + assert.Equal(t, w.ID, int64(*task.WorkerID)) } } @@ -713,7 +714,7 @@ func TestAddWorkerToTaskFailedList(t *testing.T) { newWorker.ID = 0 newWorker.UUID = "89ed2b02-b51b-4cd4-b44a-4a1c8d01db85" newWorker.Name = "Worker 2" - require.NoError(t, db.SaveWorker(ctx, &newWorker)) + require.NoError(t, db.CreateWorker(ctx, &newWorker)) worker2, err := db.FetchWorker(ctx, newWorker.UUID) require.NoError(t, err) @@ -752,7 +753,7 @@ func TestClearFailureListOfTask(t *testing.T) { newWorker.ID = 0 newWorker.UUID = "89ed2b02-b51b-4cd4-b44a-4a1c8d01db85" newWorker.Name = "Worker 2" - require.NoError(t, db.SaveWorker(ctx, &newWorker)) + require.NoError(t, db.CreateWorker(ctx, &newWorker)) worker2, err := db.FetchWorker(ctx, newWorker.UUID) require.NoError(t, err) @@ -1020,7 +1021,6 @@ func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*W Software: "3.0", Status: api.WorkerStatusAwake, SupportedTaskTypes: "blender,ffmpeg,file-management", - Tags: nil, } for _, updater := range updaters { @@ -1039,14 +1039,26 @@ func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*W // createWorkerFrom duplicates the given worker, ensuring new UUIDs. func createWorkerFrom(ctx context.Context, t *testing.T, db *DB, worker Worker) *Worker { - worker.ID = 0 - worker.UUID = uuid.New() - worker.Name += " (copy)" + newWorker := sqlc.Worker{ + UUID: uuid.New(), + Secret: worker.Secret, + Name: worker.Name + " (copy)", + Address: worker.Address, + Platform: worker.Platform, + Software: worker.Software, + Status: worker.Status, + LastSeenAt: nullTimeToUTC(worker.LastSeenAt), + StatusRequested: worker.StatusRequested, + LazyStatusRequest: worker.LazyStatusRequest, + SupportedTaskTypes: worker.SupportedTaskTypes, + DeletedAt: worker.DeletedAt, + CanRestart: worker.CanRestart, + } - err := db.SaveWorker(ctx, &worker) + err := db.CreateWorker(ctx, &newWorker) require.NoError(t, err) - dbWorker, err := db.FetchWorker(ctx, worker.UUID) + dbWorker, err := db.FetchWorker(ctx, newWorker.UUID) require.NoError(t, err) return dbWorker diff --git a/internal/manager/persistence/sqlc/methods.go b/internal/manager/persistence/sqlc/methods.go new file mode 100644 index 00000000..04bc96dd --- /dev/null +++ b/internal/manager/persistence/sqlc/methods.go @@ -0,0 +1,41 @@ +// Code MANUALLY written to extend the SQLC structs with some extra methods. + +package sqlc + +import ( + "fmt" + "strings" + + "projects.blender.org/studio/flamenco/pkg/api" +) + +// SPDX-License-Identifier: GPL-3.0-or-later + +func (w *Worker) Identifier() string { + // Avoid a panic when worker.Identifier() is called on a nil pointer. + if w == nil { + return "-nil worker-" + } + return fmt.Sprintf("%s (%s)", w.Name, w.UUID) +} + +// TaskTypes returns the worker's supported task types as list of strings. +func (w *Worker) TaskTypes() []string { + return strings.Split(w.SupportedTaskTypes, ",") +} + +// StatusChangeRequest stores a requested status change on the Worker. +// This just updates the Worker instance, but doesn't store the change in the +// database. +func (w *Worker) StatusChangeRequest(status api.WorkerStatus, isLazyRequest bool) { + w.StatusRequested = status + w.LazyStatusRequest = isLazyRequest +} + +// StatusChangeClear clears the requested status change of the Worker. +// This just updates the Worker instance, but doesn't store the change in the +// database. +func (w *Worker) StatusChangeClear() { + w.StatusRequested = "" + w.LazyStatusRequest = false +} diff --git a/internal/manager/persistence/sqlc/models.go b/internal/manager/persistence/sqlc/models.go index 3ddf9108..ccbe873e 100644 --- a/internal/manager/persistence/sqlc/models.go +++ b/internal/manager/persistence/sqlc/models.go @@ -8,6 +8,8 @@ import ( "database/sql" "encoding/json" "time" + + "projects.blender.org/studio/flamenco/pkg/api" ) type Job struct { @@ -18,7 +20,7 @@ type Job struct { Name string JobType string Priority int64 - Status string + Status api.JobStatus Activity string Settings json.RawMessage Metadata json.RawMessage @@ -64,7 +66,7 @@ type Task struct { JobID int64 IndexInJob int64 Priority int64 - Status string + Status api.TaskStatus WorkerID sql.NullInt64 LastTouchedAt sql.NullTime Commands json.RawMessage @@ -92,9 +94,9 @@ type Worker struct { Address string Platform string Software string - Status string + Status api.WorkerStatus LastSeenAt sql.NullTime - StatusRequested string + StatusRequested api.WorkerStatus LazyStatusRequest bool SupportedTaskTypes string DeletedAt sql.NullTime diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 65c35f46..8960398e 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -11,6 +11,8 @@ import ( "encoding/json" "strings" "time" + + "projects.blender.org/studio/flamenco/pkg/api" ) const addWorkerToJobBlocklist = `-- name: AddWorkerToJobBlocklist :exec @@ -156,7 +158,7 @@ type CreateJobParams struct { Name string JobType string Priority int64 - Status string + Status api.JobStatus Activity string Settings json.RawMessage Metadata json.RawMessage @@ -218,7 +220,7 @@ type CreateTaskParams struct { JobID int64 IndexInJob int64 Priority int64 - Status string + Status api.TaskStatus Commands json.RawMessage } @@ -464,7 +466,7 @@ const fetchJobsInStatus = `-- name: FetchJobsInStatus :many SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs WHERE status IN (/*SLICE:statuses*/?) ` -func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]Job, error) { +func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []api.JobStatus) ([]Job, error) { query := fetchJobsInStatus var queryParams []interface{} if len(statuses) > 0 { @@ -675,7 +677,7 @@ WHERE tasks.job_id = ?1 type FetchTasksOfJobInStatusParams struct { JobID int64 - TaskStatus []string + TaskStatus []api.TaskStatus } type FetchTasksOfJobInStatusRow struct { @@ -743,7 +745,7 @@ WHERE tasks.worker_id = ?1 type FetchTasksOfWorkerInStatusParams struct { WorkerID sql.NullInt64 - TaskStatus string + TaskStatus api.TaskStatus } type FetchTasksOfWorkerInStatusRow struct { @@ -801,7 +803,7 @@ WHERE tasks.worker_id = ?1 type FetchTasksOfWorkerInStatusOfJobParams struct { WorkerID sql.NullInt64 JobID int64 - TaskStatus string + TaskStatus api.TaskStatus } type FetchTasksOfWorkerInStatusOfJobRow struct { @@ -855,7 +857,7 @@ AND last_touched_at <= ?2 ` type FetchTimedOutTasksParams struct { - TaskStatus string + TaskStatus api.TaskStatus UntouchedSince sql.NullTime } @@ -916,7 +918,7 @@ GROUP BY status ` type JobCountTaskStatusesRow struct { - Status string + Status api.TaskStatus NumTasks int64 } @@ -951,7 +953,7 @@ WHERE job_id = ?1 AND status = ?2 type JobCountTasksInStatusParams struct { JobID int64 - TaskStatus string + TaskStatus api.TaskStatus } // Fetch number of tasks in the given status, of the given job. @@ -975,7 +977,7 @@ type QueryJobTaskSummariesRow struct { Name string IndexInJob int64 Priority int64 - Status string + Status api.TaskStatus Type string UpdatedAt sql.NullTime } @@ -1097,7 +1099,7 @@ UPDATE jobs SET updated_at=?1, status=?2, activity=?3 WHERE id=?4 type SaveJobStatusParams struct { Now sql.NullTime - Status string + Status api.JobStatus Activity string ID int64 } @@ -1170,7 +1172,7 @@ GROUP BY status ` type SummarizeJobStatusesRow struct { - Status string + Status api.JobStatus StatusCount int64 } @@ -1375,7 +1377,7 @@ WHERE job_id = ?4 type UpdateJobsTaskStatusesParams struct { UpdatedAt sql.NullTime - Status string + Status api.TaskStatus Activity string JobID int64 } @@ -1400,10 +1402,10 @@ WHERE job_id = ?4 AND status in (/*SLICE:statuses_to_update*/?) type UpdateJobsTaskStatusesConditionalParams struct { UpdatedAt sql.NullTime - Status string + Status api.TaskStatus Activity string JobID int64 - StatusesToUpdate []string + StatusesToUpdate []api.TaskStatus } func (q *Queries) UpdateJobsTaskStatusesConditional(ctx context.Context, arg UpdateJobsTaskStatusesConditionalParams) error { @@ -1444,7 +1446,7 @@ type UpdateTaskParams struct { Name string Type string Priority int64 - Status string + Status api.TaskStatus WorkerID sql.NullInt64 LastTouchedAt sql.NullTime Commands json.RawMessage @@ -1496,7 +1498,7 @@ WHERE id=?3 type UpdateTaskStatusParams struct { UpdatedAt sql.NullTime - Status string + Status api.TaskStatus ID int64 } diff --git a/internal/manager/persistence/sqlc/query_task_scheduler.sql.go b/internal/manager/persistence/sqlc/query_task_scheduler.sql.go index e0cfe922..7573ecb7 100644 --- a/internal/manager/persistence/sqlc/query_task_scheduler.sql.go +++ b/internal/manager/persistence/sqlc/query_task_scheduler.sql.go @@ -9,6 +9,8 @@ import ( "context" "database/sql" "strings" + + "projects.blender.org/studio/flamenco/pkg/api" ) const assignTaskToWorker = `-- name: AssignTaskToWorker :exec @@ -39,9 +41,9 @@ LIMIT 1 ` type FetchAssignedAndRunnableTaskOfWorkerParams struct { - ActiveTaskStatus string + ActiveTaskStatus api.TaskStatus WorkerID sql.NullInt64 - ActiveJobStatuses []string + ActiveJobStatuses []api.JobStatus } type FetchAssignedAndRunnableTaskOfWorkerRow struct { @@ -99,8 +101,8 @@ LIMIT 1 ` type FetchWorkerTaskParams struct { - TaskStatusActive string - JobStatusActive string + TaskStatusActive api.TaskStatus + JobStatusActive api.JobStatus WorkerID sql.NullInt64 } @@ -179,10 +181,10 @@ ORDER BY jobs.priority DESC, tasks.priority DESC type FindRunnableTaskParams struct { WorkerID int64 - TaskStatusCompleted string + TaskStatusCompleted api.TaskStatus WorkerTags []sql.NullInt64 - SchedulableTaskStatuses []string - SchedulableJobStatuses []string + SchedulableTaskStatuses []api.TaskStatus + SchedulableJobStatuses []api.JobStatus SupportedTaskTypes []string } diff --git a/internal/manager/persistence/sqlc/query_workers.sql b/internal/manager/persistence/sqlc/query_workers.sql index 60ddf639..2df38851 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql +++ b/internal/manager/persistence/sqlc/query_workers.sql @@ -43,7 +43,7 @@ FROM worker_tag_membership WHERE worker_id=@worker_id; -- name: FetchWorkers :many -SELECT sqlc.embed(workers) FROM workers +SELECT * FROM workers WHERE deleted_at IS NULL; -- name: FetchWorker :one diff --git a/internal/manager/persistence/sqlc/query_workers.sql.go b/internal/manager/persistence/sqlc/query_workers.sql.go index d6cbc97b..e9496859 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql.go +++ b/internal/manager/persistence/sqlc/query_workers.sql.go @@ -10,6 +10,8 @@ import ( "database/sql" "strings" "time" + + "projects.blender.org/studio/flamenco/pkg/api" ) const countWorkerTags = `-- name: CountWorkerTags :one @@ -66,9 +68,9 @@ type CreateWorkerParams struct { Address string Platform string Software string - Status string + Status api.WorkerStatus LastSeenAt sql.NullTime - StatusRequested string + StatusRequested api.WorkerStatus LazyStatusRequest bool SupportedTaskTypes string DeletedAt sql.NullTime @@ -232,7 +234,7 @@ AND status NOT IN (/*SLICE:worker_statuses_no_timeout*/?) type FetchTimedOutWorkersParams struct { LastSeenBefore sql.NullTime - WorkerStatusesNoTimeout []string + WorkerStatusesNoTimeout []api.WorkerStatus } func (q *Queries) FetchTimedOutWorkers(ctx context.Context, arg FetchTimedOutWorkersParams) ([]Worker, error) { @@ -548,40 +550,36 @@ func (q *Queries) FetchWorkerUnconditionalByID(ctx context.Context, workerID int } const fetchWorkers = `-- name: FetchWorkers :many -SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers +SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE deleted_at IS NULL ` -type FetchWorkersRow struct { - Worker Worker -} - -func (q *Queries) FetchWorkers(ctx context.Context) ([]FetchWorkersRow, error) { +func (q *Queries) FetchWorkers(ctx context.Context) ([]Worker, error) { rows, err := q.db.QueryContext(ctx, fetchWorkers) if err != nil { return nil, err } defer rows.Close() - var items []FetchWorkersRow + var items []Worker for rows.Next() { - var i FetchWorkersRow + var i Worker if err := rows.Scan( - &i.Worker.ID, - &i.Worker.CreatedAt, - &i.Worker.UpdatedAt, - &i.Worker.UUID, - &i.Worker.Secret, - &i.Worker.Name, - &i.Worker.Address, - &i.Worker.Platform, - &i.Worker.Software, - &i.Worker.Status, - &i.Worker.LastSeenAt, - &i.Worker.StatusRequested, - &i.Worker.LazyStatusRequest, - &i.Worker.SupportedTaskTypes, - &i.Worker.DeletedAt, - &i.Worker.CanRestart, + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Secret, + &i.Name, + &i.Address, + &i.Platform, + &i.Software, + &i.Status, + &i.LastSeenAt, + &i.StatusRequested, + &i.LazyStatusRequest, + &i.SupportedTaskTypes, + &i.DeletedAt, + &i.CanRestart, ); err != nil { return nil, err } @@ -622,9 +620,9 @@ type SaveWorkerParams struct { Address string Platform string Software string - Status string + Status api.WorkerStatus LastSeenAt sql.NullTime - StatusRequested string + StatusRequested api.WorkerStatus LazyStatusRequest bool SupportedTaskTypes string CanRestart bool @@ -662,8 +660,8 @@ WHERE id=?5 type SaveWorkerStatusParams struct { UpdatedAt sql.NullTime - Status string - StatusRequested string + Status api.WorkerStatus + StatusRequested api.WorkerStatus LazyStatusRequest bool ID int64 } @@ -808,7 +806,7 @@ GROUP BY status ` type SummarizeWorkerStatusesRow struct { - Status string + Status api.WorkerStatus StatusCount int64 } diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 9d5539b5..39903b0f 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -77,8 +77,8 @@ func (db *DB) scheduleTask(ctx context.Context, queries *sqlc.Queries, w *Worker // Worker, but since it's active that is unlikely. { row, err := queries.FetchAssignedAndRunnableTaskOfWorker(ctx, sqlc.FetchAssignedAndRunnableTaskOfWorkerParams{ - ActiveTaskStatus: string(api.TaskStatusActive), - ActiveJobStatuses: convertJobStatuses(schedulableJobStatuses), + ActiveTaskStatus: api.TaskStatusActive, + ActiveJobStatuses: schedulableJobStatuses, WorkerID: workerID, }) @@ -141,18 +141,22 @@ func findTaskForWorker( w *Worker, ) (sqlc.Task, error) { - // Construct the list of worker tags to check. - workerTags := make([]sql.NullInt64, len(w.Tags)) - for index, tag := range w.Tags { - workerTags[index] = sql.NullInt64{Int64: int64(tag.ID), Valid: true} + // Construct the list of worker tag IDs to check. + tags, err := queries.FetchTagsOfWorker(ctx, w.UUID) + if err != nil { + return sqlc.Task{}, err + } + workerTags := make([]sql.NullInt64, len(tags)) + for index, tag := range tags { + workerTags[index] = sql.NullInt64{Int64: tag.ID, Valid: true} } row, err := queries.FindRunnableTask(ctx, sqlc.FindRunnableTaskParams{ WorkerID: int64(w.ID), - SchedulableTaskStatuses: convertTaskStatuses(schedulableTaskStatuses), - SchedulableJobStatuses: convertJobStatuses(schedulableJobStatuses), + SchedulableTaskStatuses: schedulableTaskStatuses, + SchedulableJobStatuses: schedulableJobStatuses, SupportedTaskTypes: w.TaskTypes(), - TaskStatusCompleted: string(api.TaskStatusCompleted), + TaskStatusCompleted: api.TaskStatusCompleted, WorkerTags: workerTags, }) if err != nil { diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 381f7888..3e1b4035 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -48,7 +48,7 @@ func TestOneJobOneTask(t *testing.T) { require.NotNil(t, task) assert.Equal(t, job.ID, task.JobID) require.NotNil(t, task.WorkerID, "no worker assigned to returned task") - assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker") + assert.Equal(t, w.ID, int64(*task.WorkerID), "task must be assigned to the requesting worker") // Check the task in the database. now := db.now() @@ -57,7 +57,7 @@ func TestOneJobOneTask(t *testing.T) { require.NotNil(t, dbTask) require.NotNil(t, dbTask.WorkerID, "no worker assigned to task in database") - assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker") + assert.Equal(t, w.ID, int64(*dbTask.WorkerID), "task must be assigned to the requesting worker") assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second, "task must be 'touched' by the worker after scheduling") } @@ -259,7 +259,7 @@ func TestAlreadyAssigned(t *testing.T) { // another, higher-prio task to be done. dbTask3, err := db.FetchTask(ctx, att3.UUID) require.NoError(t, err) - dbTask3.WorkerID = &w.ID + dbTask3.WorkerID = ptr(uint(w.ID)) dbTask3.Status = api.TaskStatusActive err = db.SaveTask(ctx, dbTask3) require.NoError(t, err) @@ -292,7 +292,7 @@ func TestAssignedToOtherWorker(t *testing.T) { // it shouldn't matter which worker it's assigned to. dbTask2, err := db.FetchTask(ctx, att2.UUID) require.NoError(t, err) - dbTask2.WorkerID = &w2.ID + dbTask2.WorkerID = ptr(uint(w2.ID)) dbTask2.Status = api.TaskStatusQueued err = db.SaveTask(ctx, dbTask2) require.NoError(t, err) @@ -302,7 +302,7 @@ func TestAssignedToOtherWorker(t *testing.T) { require.NotNil(t, task) assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen") - assert.Equal(t, *task.WorkerID, w.ID, "the task should now be assigned to the worker it was scheduled for") + assert.Equal(t, int64(*task.WorkerID), w.ID, "the task should now be assigned to the worker it was scheduled for") } func TestPreviouslyFailed(t *testing.T) { @@ -344,14 +344,12 @@ func TestWorkerTagJobWithTag(t *testing.T) { require.NoError(t, db.CreateWorkerTag(ctx, &tag2)) // Create a worker in tag1: - workerC := linuxWorker(t, db, func(w *Worker) { - w.Tags = []*WorkerTag{&tag1} - }) + workerC := linuxWorker(t, db) + require.NoError(t, db.WorkerSetTags(ctx, &workerC, []string{tag1.UUID})) // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" - w.Tags = nil }) { // Test job with different tag: @@ -391,14 +389,12 @@ func TestWorkerTagJobWithoutTag(t *testing.T) { require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) // Create a worker in tag1: - workerC := linuxWorker(t, db, func(w *Worker) { - w.Tags = []*WorkerTag{&tag1} - }) + workerC := linuxWorker(t, db) + require.NoError(t, db.WorkerSetTags(ctx, &workerC, []string{tag1.UUID})) // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" - w.Tags = nil }) // Test tag-less job: @@ -537,9 +533,9 @@ func saveTestWorker(t *testing.T, db *DB, worker *Worker) { Address: worker.Address, Platform: worker.Platform, Software: worker.Software, - Status: string(worker.Status), - LastSeenAt: sql.NullTime{Time: worker.LastSeenAt, Valid: !worker.LastSeenAt.IsZero()}, - StatusRequested: string(worker.StatusRequested), + Status: worker.Status, + LastSeenAt: nullTimeToUTC(worker.LastSeenAt), + StatusRequested: worker.StatusRequested, LazyStatusRequest: worker.LazyStatusRequest, SupportedTaskTypes: worker.SupportedTaskTypes, DeletedAt: sql.NullTime(worker.DeletedAt), @@ -550,5 +546,6 @@ func saveTestWorker(t *testing.T, db *DB, worker *Worker) { id, err := queries.CreateWorker(context.TODO(), params) require.NoError(t, err, "cannot save worker %q", worker.Name) - worker.ID = uint(id) + worker.ID = id + worker.CreatedAt = params.CreatedAt } diff --git a/internal/manager/persistence/timeout.go b/internal/manager/persistence/timeout.go index b149b0cd..af74d4a0 100644 --- a/internal/manager/persistence/timeout.go +++ b/internal/manager/persistence/timeout.go @@ -31,7 +31,7 @@ func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) queries := db.queries() sqlcTasks, err := queries.FetchTimedOutTasks(ctx, sqlc.FetchTimedOutTasksParams{ - TaskStatus: string(api.TaskStatusActive), + TaskStatus: api.TaskStatusActive, UntouchedSince: sql.NullTime{Time: untouchedSince, Valid: true}, }) @@ -54,13 +54,8 @@ func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) func (db *DB) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*Worker, error) { queries := db.queries() - statuses := make([]string, len(workerStatusNoTimeout)) - for i, status := range workerStatusNoTimeout { - statuses[i] = string(status) - } - sqlcWorkers, err := queries.FetchTimedOutWorkers(ctx, sqlc.FetchTimedOutWorkersParams{ - WorkerStatusesNoTimeout: statuses, + WorkerStatusesNoTimeout: workerStatusNoTimeout, LastSeenBefore: sql.NullTime{ Time: lastSeenBefore.UTC(), Valid: true}, @@ -71,7 +66,7 @@ func (db *DB) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time result := make([]*Worker, len(sqlcWorkers)) for index := range sqlcWorkers { - result[index] = convertSqlcWorker(sqlcWorkers[index]) + result[index] = &sqlcWorkers[index] } return result, nil } diff --git a/internal/manager/persistence/timeout_test.go b/internal/manager/persistence/timeout_test.go index 30ad05c7..344dbc4f 100644 --- a/internal/manager/persistence/timeout_test.go +++ b/internal/manager/persistence/timeout_test.go @@ -1,6 +1,7 @@ package persistence import ( + "database/sql" "testing" "time" @@ -55,8 +56,8 @@ func TestFetchTimedOutWorkers(t *testing.T) { defer cancel() timeoutDeadline := mustParseTime("2022-06-07T11:14:47+02:00") - beforeDeadline := timeoutDeadline.Add(-10 * time.Second) - afterDeadline := timeoutDeadline.Add(10 * time.Second) + beforeDeadline := sql.NullTime{Time: timeoutDeadline.Add(-10 * time.Second), Valid: true} + afterDeadline := sql.NullTime{Time: timeoutDeadline.Add(10 * time.Second), Valid: true} worker0 := Worker{ // Offline, so should not time out. UUID: "c7b4d1d5-0a96-4e19-993f-028786d3d2c1", diff --git a/internal/manager/persistence/worker_sleep_schedule.go b/internal/manager/persistence/worker_sleep_schedule.go index 133fc470..fd361411 100644 --- a/internal/manager/persistence/worker_sleep_schedule.go +++ b/internal/manager/persistence/worker_sleep_schedule.go @@ -60,7 +60,7 @@ func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, sch if err != nil { return fmt.Errorf("fetching worker %q: %w", workerUUID, err) } - schedule.WorkerID = worker.ID + schedule.WorkerID = uint(worker.ID) schedule.Worker = worker // Only store timestamps in UTC. @@ -120,7 +120,7 @@ func (db *DB) FetchSleepScheduleWorker(ctx context.Context, schedule *SleepSched return workerError(err, "finding worker by their sleep schedule") } - schedule.Worker = convertSqlcWorker(worker) + schedule.Worker = &worker return nil } diff --git a/internal/manager/persistence/worker_sleep_schedule_test.go b/internal/manager/persistence/worker_sleep_schedule_test.go index b529b4e2..d1bed97c 100644 --- a/internal/manager/persistence/worker_sleep_schedule_test.go +++ b/internal/manager/persistence/worker_sleep_schedule_test.go @@ -40,7 +40,7 @@ func TestFetchWorkerSleepSchedule(t *testing.T) { // Create a sleep schedule. created := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: true, @@ -53,7 +53,7 @@ func TestFetchWorkerSleepSchedule(t *testing.T) { fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, created, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), created, *fetched) } func TestFetchSleepScheduleWorker(t *testing.T) { @@ -74,7 +74,7 @@ func TestFetchSleepScheduleWorker(t *testing.T) { // Create a sleep schedule. created := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: true, @@ -120,7 +120,7 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) schedule := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: true, @@ -138,7 +138,7 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) fetched, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, schedule, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), schedule, *fetched) // Overwrite the schedule with one that already has a database ID. newSchedule := schedule @@ -150,11 +150,11 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, newSchedule, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), newSchedule, *fetched) // Overwrite the schedule with a freshly constructed one. newerSchedule := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: true, @@ -166,11 +166,11 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, newerSchedule, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), newerSchedule, *fetched) // Clear the sleep schedule. emptySchedule := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: false, @@ -182,7 +182,7 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, emptySchedule, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), emptySchedule, *fetched) } @@ -212,7 +212,7 @@ func TestSetWorkerSleepScheduleNextCheck(t *testing.T) { fetched, err := db.FetchWorkerSleepSchedule(ctx, schedule.Worker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, schedule.Worker.ID, schedule, *fetched) + assertEqualSleepSchedule(t, uint(schedule.Worker.ID), schedule, *fetched) } func TestFetchSleepSchedulesToCheck(t *testing.T) { @@ -293,9 +293,9 @@ func TestFetchSleepSchedulesToCheck(t *testing.T) { require.NoError(t, err) require.Len(t, toCheck, 2) - assertEqualSleepSchedule(t, schedule0.Worker.ID, schedule0, *toCheck[0]) + assertEqualSleepSchedule(t, uint(schedule0.Worker.ID), schedule0, *toCheck[0]) assert.Nil(t, toCheck[0].Worker, "the Worker should NOT be fetched") - assertEqualSleepSchedule(t, schedule2.Worker.ID, schedule1, *toCheck[1]) + assertEqualSleepSchedule(t, uint(schedule2.Worker.ID), schedule1, *toCheck[1]) assert.Nil(t, toCheck[1].Worker, "the Worker should NOT be fetched") } diff --git a/internal/manager/persistence/worker_tag.go b/internal/manager/persistence/worker_tag.go index 10dbacb4..aacddd0c 100644 --- a/internal/manager/persistence/worker_tag.go +++ b/internal/manager/persistence/worker_tag.go @@ -173,3 +173,17 @@ func (db *DB) WorkerSetTags(ctx context.Context, worker *Worker, tagUUIDs []stri return qtx.commit() } + +func (db *DB) FetchTagsOfWorker(ctx context.Context, workerUUID string) ([]WorkerTag, error) { + queries := db.queries() + tags, err := queries.FetchTagsOfWorker(ctx, workerUUID) + if err != nil { + return nil, workerTagError(err, "fetching tags of worker %s", workerUUID) + } + + gormTags := make([]WorkerTag, len(tags)) + for index, tag := range tags { + gormTags[index] = *convertSqlcWorkerTag(tag) + } + return gormTags, nil +} diff --git a/internal/manager/persistence/worker_tag_test.go b/internal/manager/persistence/worker_tag_test.go index 955da9cd..57fa2604 100644 --- a/internal/manager/persistence/worker_tag_test.go +++ b/internal/manager/persistence/worker_tag_test.go @@ -101,8 +101,11 @@ func TestAssignUnassignWorkerTags(t *testing.T) { w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) require.NoError(t, err) + tags, err := f.db.queries().FetchTagsOfWorker(f.ctx, w.UUID) + require.NoError(t, err) + // Catch doubly-reported tags, as the maps below would hide those cases. - assert.Len(t, w.Tags, len(tagUUIDs), msgLabel) + assert.Len(t, tags, len(tagUUIDs), msgLabel) expectTags := make(map[string]bool) for _, cid := range tagUUIDs { @@ -110,8 +113,8 @@ func TestAssignUnassignWorkerTags(t *testing.T) { } actualTags := make(map[string]bool) - for _, c := range w.Tags { - actualTags[c.UUID] = true + for _, tag := range tags { + actualTags[tag.UUID] = true } assert.Equal(t, expectTags, actualTags, msgLabel) @@ -171,9 +174,9 @@ func TestDeleteWorkerTagWithWorkersAssigned(t *testing.T) { require.NoError(t, f.db.DeleteWorkerTag(f.ctx, f.tag.UUID)) // Check the Worker has been unassigned from the tag. - w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) + tags, err := f.db.queries().FetchTagsOfWorker(f.ctx, f.worker.UUID) require.NoError(t, err) - assert.Empty(t, w.Tags) + assert.Empty(t, tags) } func assertTagsMatch(t *testing.T, f WorkerTestFixture, expectUUIDs ...string) { diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 93b76ec0..589c7d16 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -7,7 +7,6 @@ import ( "database/sql" "errors" "fmt" - "strings" "time" "github.com/rs/zerolog/log" @@ -15,7 +14,9 @@ import ( "projects.blender.org/studio/flamenco/pkg/api" ) -type Worker struct { +type Worker = sqlc.Worker + +type Worker__gorm struct { Model DeletedAt sql.NullTime @@ -38,76 +39,34 @@ type Worker struct { Tags []*WorkerTag } -func (w *Worker) Identifier() string { - // Avoid a panic when worker.Identifier() is called on a nil pointer. - if w == nil { - return "-nil worker-" - } - return fmt.Sprintf("%s (%s)", w.Name, w.UUID) -} - -// TaskTypes returns the worker's supported task types as list of strings. -func (w *Worker) TaskTypes() []string { - return strings.Split(w.SupportedTaskTypes, ",") -} - -// StatusChangeRequest stores a requested status change on the Worker. -// This just updates the Worker instance, but doesn't store the change in the -// database. -func (w *Worker) StatusChangeRequest(status api.WorkerStatus, isLazyRequest bool) { - w.StatusRequested = status - w.LazyStatusRequest = isLazyRequest -} - -// StatusChangeClear clears the requested status change of the Worker. -// This just updates the Worker instance, but doesn't store the change in the -// database. -func (w *Worker) StatusChangeClear() { - w.StatusRequested = "" - w.LazyStatusRequest = false -} - func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { queries := db.queries() now := db.nowNullable().Time - workerID, err := queries.CreateWorker(ctx, sqlc.CreateWorkerParams{ - CreatedAt: now, - UUID: w.UUID, - Secret: w.Secret, - Name: w.Name, - Address: w.Address, - Platform: w.Platform, - Software: w.Software, - Status: string(w.Status), - LastSeenAt: sql.NullTime{ - Time: w.LastSeenAt.UTC(), - Valid: !w.LastSeenAt.IsZero(), - }, - StatusRequested: string(w.StatusRequested), + params := sqlc.CreateWorkerParams{ + CreatedAt: now, + UUID: w.UUID, + Secret: w.Secret, + Name: w.Name, + Address: w.Address, + Platform: w.Platform, + Software: w.Software, + Status: w.Status, + LastSeenAt: nullTimeToUTC(w.LastSeenAt), + StatusRequested: w.StatusRequested, LazyStatusRequest: w.LazyStatusRequest, SupportedTaskTypes: w.SupportedTaskTypes, DeletedAt: sql.NullTime(w.DeletedAt), CanRestart: w.CanRestart, - }) + } + + workerID, err := queries.CreateWorker(ctx, params) if err != nil { return fmt.Errorf("creating new worker: %w", err) } - w.ID = uint(workerID) - w.CreatedAt = now - - // TODO: remove the create-with-tags functionality to a higher-level function. - // This code is just here to make this function work like the GORM code did. - for _, tag := range w.Tags { - err := queries.WorkerAddTagMembership(ctx, sqlc.WorkerAddTagMembershipParams{ - WorkerTagID: int64(tag.ID), - WorkerID: workerID, - }) - if err != nil { - return err - } - } + w.ID = workerID + w.CreatedAt = params.CreatedAt return nil } @@ -120,19 +79,7 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { return nil, workerError(err, "fetching worker %s", uuid) } - // TODO: remove this code, and let the caller fetch the tags when interested in them. - workerTags, err := queries.FetchTagsOfWorker(ctx, uuid) - if err != nil { - return nil, workerTagError(err, "fetching tags of worker %s", uuid) - } - - convertedWorker := convertSqlcWorker(worker) - convertedWorker.Tags = make([]*WorkerTag, len(workerTags)) - for index := range workerTags { - convertedWorker.Tags[index] = convertSqlcWorkerTag(workerTags[index]) - } - - return convertedWorker, nil + return &worker, nil } func (db *DB) DeleteWorker(ctx context.Context, uuid string) error { @@ -168,11 +115,11 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { return nil, workerError(err, "fetching all workers") } - gormWorkers := make([]*Worker, len(workers)) + workerPointers := make([]*Worker, len(workers)) for idx := range workers { - gormWorkers[idx] = convertSqlcWorker(workers[idx].Worker) + workerPointers[idx] = &workers[idx] } - return gormWorkers, nil + return workerPointers, nil } // FetchWorkerTask returns the most recent task assigned to the given Worker. @@ -184,8 +131,8 @@ func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error workerID := sql.NullInt64{Int64: int64(worker.ID), Valid: true} row, err := queries.FetchWorkerTask(ctx, sqlc.FetchWorkerTaskParams{ - TaskStatusActive: string(api.TaskStatusActive), - JobStatusActive: string(api.JobStatusActive), + TaskStatusActive: api.TaskStatusActive, + JobStatusActive: api.JobStatusActive, WorkerID: workerID, }) @@ -224,10 +171,10 @@ func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { err := queries.SaveWorkerStatus(ctx, sqlc.SaveWorkerStatusParams{ UpdatedAt: db.nowNullable(), - Status: string(w.Status), - StatusRequested: string(w.StatusRequested), + Status: w.Status, + StatusRequested: w.StatusRequested, LazyStatusRequest: w.LazyStatusRequest, - ID: int64(w.ID), + ID: w.ID, }) if err != nil { return fmt.Errorf("saving worker status: %w", err) @@ -235,10 +182,9 @@ func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { return nil } -func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { - // TODO: remove this code, and just let the caller call CreateWorker() directly. +func (db *DB) SaveWorker(ctx context.Context, w *sqlc.Worker) error { if w.ID == 0 { - return db.CreateWorker(ctx, w) + panic("Do not use SaveWorker() to create a new Worker, use CreateWorker() instead") } queries := db.queries() @@ -251,13 +197,13 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { Address: w.Address, Platform: w.Platform, Software: w.Software, - Status: string(w.Status), - LastSeenAt: sql.NullTime{Time: w.LastSeenAt, Valid: !w.LastSeenAt.IsZero()}, - StatusRequested: string(w.StatusRequested), + Status: w.Status, + LastSeenAt: w.LastSeenAt, + StatusRequested: w.StatusRequested, LazyStatusRequest: w.LazyStatusRequest, SupportedTaskTypes: w.SupportedTaskTypes, CanRestart: w.CanRestart, - ID: int64(w.ID), + ID: w.ID, }) if err != nil { return fmt.Errorf("saving worker: %w", err) @@ -303,34 +249,6 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e return statusCounts, nil } -// convertSqlcWorker converts a worker from the SQLC-generated model to the model -// expected by the rest of the code. This is mostly in place to aid in the GORM -// to SQLC migration. It is intended that eventually the rest of the code will -// use the same SQLC-generated model. -func convertSqlcWorker(worker sqlc.Worker) *Worker { - return &Worker{ - Model: Model{ - ID: uint(worker.ID), - CreatedAt: worker.CreatedAt, - UpdatedAt: worker.UpdatedAt.Time, - }, - DeletedAt: worker.DeletedAt, - - UUID: worker.UUID, - Secret: worker.Secret, - Name: worker.Name, - Address: worker.Address, - Platform: worker.Platform, - Software: worker.Software, - Status: api.WorkerStatus(worker.Status), - LastSeenAt: worker.LastSeenAt.Time, - CanRestart: worker.CanRestart, - StatusRequested: api.WorkerStatus(worker.StatusRequested), - LazyStatusRequest: worker.LazyStatusRequest, - SupportedTaskTypes: worker.SupportedTaskTypes, - } -} - // convertSqlcWorkerTag converts a worker tag from the SQLC-generated model to // the model expected by the rest of the code. This is mostly in place to aid in // the GORM to SQLC migration. It is intended that eventually the rest of the diff --git a/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go b/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go index 274338e8..1d428fbb 100644 --- a/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go +++ b/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" + sqlc "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" api "projects.blender.org/studio/flamenco/pkg/api" ) @@ -81,7 +82,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorkerSleepSchedule(arg0, arg } // SaveWorkerStatus mocks base method. -func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveWorkerStatus", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/internal/manager/sleep_scheduler/sleep_scheduler.go b/internal/manager/sleep_scheduler/sleep_scheduler.go index fc3f2779..613cf1a7 100644 --- a/internal/manager/sleep_scheduler/sleep_scheduler.go +++ b/internal/manager/sleep_scheduler/sleep_scheduler.go @@ -178,7 +178,7 @@ func (ss *SleepScheduler) updateWorkerStatus( IsLazy: false, Status: worker.StatusRequested, }, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) diff --git a/internal/manager/sleep_scheduler/sleep_scheduler_test.go b/internal/manager/sleep_scheduler/sleep_scheduler_test.go index e02d1d94..5fa32491 100644 --- a/internal/manager/sleep_scheduler/sleep_scheduler_test.go +++ b/internal/manager/sleep_scheduler/sleep_scheduler_test.go @@ -118,7 +118,7 @@ func TestApplySleepSchedule(t *testing.T) { ss, mocks, ctx := testFixtures(t) worker := persistence.Worker{ - Model: persistence.Model{ID: 5}, + ID: 5, UUID: "74997de4-c530-4913-b89f-c489f14f7634", Status: api.WorkerStatusOffline, } @@ -192,7 +192,7 @@ func TestApplySleepScheduleNoStatusChange(t *testing.T) { ss, mocks, ctx := testFixtures(t) worker := persistence.Worker{ - Model: persistence.Model{ID: 5}, + ID: 5, UUID: "74997de4-c530-4913-b89f-c489f14f7634", Status: api.WorkerStatusAsleep, } diff --git a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go index 1a084f17..ef004fba 100644 --- a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go +++ b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go @@ -11,6 +11,7 @@ import ( gomock "github.com/golang/mock/gomock" zerolog "github.com/rs/zerolog" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" + sqlc "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" api "projects.blender.org/studio/flamenco/pkg/api" ) @@ -79,7 +80,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobsInStatus(arg0 interface{} } // FetchTasksOfWorkerInStatus mocks base method. -func (m *MockPersistenceService) FetchTasksOfWorkerInStatus(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus) ([]*persistence.Task, error) { +func (m *MockPersistenceService) FetchTasksOfWorkerInStatus(arg0 context.Context, arg1 *sqlc.Worker, arg2 api.TaskStatus) ([]*persistence.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatus", arg0, arg1, arg2) ret0, _ := ret[0].([]*persistence.Task) @@ -94,7 +95,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatus(arg0, a } // FetchTasksOfWorkerInStatusOfJob mocks base method. -func (m *MockPersistenceService) FetchTasksOfWorkerInStatusOfJob(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus, arg3 *persistence.Job) ([]*persistence.Task, error) { +func (m *MockPersistenceService) FetchTasksOfWorkerInStatusOfJob(arg0 context.Context, arg1 *sqlc.Worker, arg2 api.TaskStatus, arg3 *persistence.Job) ([]*persistence.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatusOfJob", arg0, arg1, arg2, arg3) ret0, _ := ret[0].([]*persistence.Task) diff --git a/internal/manager/timeout_checker/interfaces.go b/internal/manager/timeout_checker/interfaces.go index bff1eb90..dd51fb4f 100644 --- a/internal/manager/timeout_checker/interfaces.go +++ b/internal/manager/timeout_checker/interfaces.go @@ -9,6 +9,7 @@ import ( "github.com/rs/zerolog" "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -19,7 +20,7 @@ import ( type PersistenceService interface { FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*persistence.Worker, error) - SaveWorker(ctx context.Context, w *persistence.Worker) error + SaveWorker(ctx context.Context, w *sqlc.Worker) error } var _ PersistenceService = (*persistence.DB)(nil) diff --git a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go index af14b46c..509af5bd 100644 --- a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go +++ b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go @@ -12,6 +12,7 @@ import ( gomock "github.com/golang/mock/gomock" zerolog "github.com/rs/zerolog" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" + sqlc "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" api "projects.blender.org/studio/flamenco/pkg/api" ) @@ -54,10 +55,10 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutTasks(arg0, arg1 inte } // FetchTimedOutWorkers mocks base method. -func (m *MockPersistenceService) FetchTimedOutWorkers(arg0 context.Context, arg1 time.Time) ([]*persistence.Worker, error) { +func (m *MockPersistenceService) FetchTimedOutWorkers(arg0 context.Context, arg1 time.Time) ([]*sqlc.Worker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchTimedOutWorkers", arg0, arg1) - ret0, _ := ret[0].([]*persistence.Worker) + ret0, _ := ret[0].([]*sqlc.Worker) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -69,7 +70,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutWorkers(arg0, arg1 in } // SaveWorker mocks base method. -func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveWorker", arg0, arg1) ret0, _ := ret[0].(error) @@ -106,7 +107,7 @@ func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder { } // RequeueActiveTasksOfWorker mocks base method. -func (m *MockTaskStateMachine) RequeueActiveTasksOfWorker(arg0 context.Context, arg1 *persistence.Worker, arg2 string) error { +func (m *MockTaskStateMachine) RequeueActiveTasksOfWorker(arg0 context.Context, arg1 *sqlc.Worker, arg2 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RequeueActiveTasksOfWorker", arg0, arg1, arg2) ret0, _ := ret[0].(error) diff --git a/internal/manager/timeout_checker/tasks_test.go b/internal/manager/timeout_checker/tasks_test.go index 63291ee5..0c4747c8 100644 --- a/internal/manager/timeout_checker/tasks_test.go +++ b/internal/manager/timeout_checker/tasks_test.go @@ -111,9 +111,9 @@ func TestTaskTimeout(t *testing.T) { job := persistence.Job{UUID: "JOB-UUID"} worker := persistence.Worker{ - UUID: "WORKER-UUID", - Name: "Tester", - Model: persistence.Model{ID: 47}, + UUID: "WORKER-UUID", + Name: "Tester", + ID: 47, } taskUnassigned := persistence.Task{ UUID: "TASK-UUID-UNASSIGNED", @@ -124,13 +124,13 @@ func TestTaskTimeout(t *testing.T) { UUID: "TASK-UUID-UNKNOWN", Job: &job, LastTouchedAt: lastTime, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), } taskAssigned := persistence.Task{ UUID: "TASK-UUID-ASSIGNED", Job: &job, LastTouchedAt: lastTime, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Worker: &worker, } diff --git a/internal/manager/timeout_checker/workers.go b/internal/manager/timeout_checker/workers.go index 918e4604..d3b32afe 100644 --- a/internal/manager/timeout_checker/workers.go +++ b/internal/manager/timeout_checker/workers.go @@ -6,7 +6,7 @@ import ( "context" "github.com/rs/zerolog/log" - "projects.blender.org/studio/flamenco/internal/manager/persistence" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -37,11 +37,11 @@ func (ttc *TimeoutChecker) checkWorkers(ctx context.Context) { } // timeoutTask marks a task as 'failed' due to a timeout. -func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistence.Worker) { +func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *sqlc.Worker) { logger := log.With(). Str("worker", worker.UUID). Str("name", worker.Name). - Str("lastSeenAt", worker.LastSeenAt.String()). + Str("lastSeenAt", worker.LastSeenAt.Time.String()). Logger() logger.Warn().Msg("TimeoutChecker: worker timed out") @@ -63,9 +63,13 @@ func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistenc ttc.broadcaster.BroadcastWorkerUpdate(api.EventWorkerUpdate{ Id: worker.UUID, Name: worker.Name, - PreviousStatus: &prevStatus, + PreviousStatus: ptr(api.WorkerStatus(prevStatus)), Status: api.WorkerStatusError, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) } + +func ptr[T any](value T) *T { + return &value +} diff --git a/internal/manager/timeout_checker/workers_test.go b/internal/manager/timeout_checker/workers_test.go index b2202c0e..79f43512 100644 --- a/internal/manager/timeout_checker/workers_test.go +++ b/internal/manager/timeout_checker/workers_test.go @@ -3,6 +3,7 @@ package timeout_checker // SPDX-License-Identifier: GPL-3.0-or-later import ( + "database/sql" "testing" "time" @@ -30,8 +31,8 @@ func TestWorkerTimeout(t *testing.T) { worker := persistence.Worker{ UUID: "WORKER-UUID", Name: "Tester", - Model: persistence.Model{ID: 47}, - LastSeenAt: lastSeenAt, + ID: 47, + LastSeenAt: sql.NullTime{Time: lastSeenAt, Valid: true}, Status: api.WorkerStatusAsleep, StatusRequested: api.WorkerStatusAwake, } @@ -58,7 +59,7 @@ func TestWorkerTimeout(t *testing.T) { Name: worker.Name, PreviousStatus: &prevStatus, Status: api.WorkerStatusError, - Updated: persistedWorker.UpdatedAt, + Updated: persistedWorker.UpdatedAt.Time, Version: persistedWorker.Software, }) diff --git a/sqlc.yaml b/sqlc.yaml index 9025df8b..332ed912 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -11,6 +11,14 @@ sql: go_type: import: "encoding/json" type: "RawMessage" + - column: jobs.status + go_type: { type: "JobStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: tasks.status + go_type: { type: "TaskStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status_requested + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } rename: uuid: "UUID" uuids: "UUIDs" @@ -28,6 +36,14 @@ sql: go_type: import: "encoding/json" type: "RawMessage" + - column: jobs.status + go_type: { type: "JobStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: tasks.status + go_type: { type: "TaskStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status_requested + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } rename: uuid: "UUID" uuids: "UUIDs" @@ -45,6 +61,14 @@ sql: go_type: import: "encoding/json" type: "RawMessage" + - column: jobs.status + go_type: { type: "JobStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: tasks.status + go_type: { type: "TaskStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status_requested + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } rename: uuid: "UUID" uuids: "UUIDs" @@ -62,6 +86,14 @@ sql: go_type: import: "encoding/json" type: "RawMessage" + - column: jobs.status + go_type: { type: "JobStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: tasks.status + go_type: { type: "TaskStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status_requested + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } rename: uuid: "UUID" uuids: "UUIDs"