From 84f93e7502275d03cb76231774b5beaa31f524c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 11 Nov 2024 19:44:42 +0100 Subject: [PATCH] Transition from ex-GORM structs to sqlc structs (2/5) Replace old used-to-be-GORM datastructures (#104305) with sqlc-generated structs. This also makes it possible to use more specific structs that are more taylored to the specific queries, increasing efficiency. This commit mostly deals with workers, including the sleep schedule and task scheduler. Functional changes are kept to a minimum, as the API still serves the same data. Because this work covers so much of Flamenco's code, it's been split up into different commits. Each commit brings Flamenco to a state where it compiles and unit tests pass. Only the result of the final commit has actually been tested properly. Ref: #104343 --- internal/manager/api_impl/interfaces.go | 1 + .../api_impl/mocks/api_impl_mock.gen.go | 51 +++--- internal/manager/api_impl/support_test.go | 2 +- internal/manager/api_impl/worker_mgt.go | 36 +++-- internal/manager/api_impl/worker_mgt_test.go | 24 ++- .../manager/api_impl/worker_task_updates.go | 2 +- .../api_impl/worker_task_updates_test.go | 8 +- internal/manager/api_impl/workers.go | 2 +- internal/manager/api_impl/workers_test.go | 22 +-- internal/manager/eventbus/events_workers.go | 6 +- internal/manager/persistence/conversion.go | 16 ++ .../manager/persistence/conversion_test.go | 44 +++++ internal/manager/persistence/errors.go | 24 +-- internal/manager/persistence/errors_test.go | 16 +- internal/manager/persistence/jobs.go | 77 ++++----- .../persistence/jobs_blocklist_test.go | 18 ++- internal/manager/persistence/jobs_test.go | 30 ++-- internal/manager/persistence/sqlc/methods.go | 41 +++++ internal/manager/persistence/sqlc/models.go | 10 +- .../persistence/sqlc/query_jobs.sql.go | 36 +++-- .../sqlc/query_task_scheduler.sql.go | 16 +- .../persistence/sqlc/query_workers.sql | 2 +- .../persistence/sqlc/query_workers.sql.go | 62 ++++---- .../manager/persistence/task_scheduler.go | 22 +-- .../persistence/task_scheduler_test.go | 31 ++-- internal/manager/persistence/timeout.go | 11 +- internal/manager/persistence/timeout_test.go | 5 +- .../persistence/worker_sleep_schedule.go | 4 +- .../persistence/worker_sleep_schedule_test.go | 26 +-- internal/manager/persistence/worker_tag.go | 14 ++ .../manager/persistence/worker_tag_test.go | 13 +- internal/manager/persistence/workers.go | 150 ++++-------------- .../mocks/interfaces_mock.gen.go | 3 +- .../sleep_scheduler/sleep_scheduler.go | 2 +- .../sleep_scheduler/sleep_scheduler_test.go | 4 +- .../mocks/interfaces_mock.gen.go | 5 +- .../manager/timeout_checker/interfaces.go | 3 +- .../mocks/interfaces_mock.gen.go | 9 +- .../manager/timeout_checker/tasks_test.go | 10 +- internal/manager/timeout_checker/workers.go | 14 +- .../manager/timeout_checker/workers_test.go | 7 +- sqlc.yaml | 32 ++++ 42 files changed, 515 insertions(+), 396 deletions(-) create mode 100644 internal/manager/persistence/conversion.go create mode 100644 internal/manager/persistence/conversion_test.go create mode 100644 internal/manager/persistence/sqlc/methods.go diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 5e428bce..3404afab 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -76,6 +76,7 @@ type PersistenceService interface { FetchWorkerTags(ctx context.Context) ([]*persistence.WorkerTag, error) DeleteWorkerTag(ctx context.Context, uuid string) error SaveWorkerTag(ctx context.Context, tag *persistence.WorkerTag) error + FetchTagsOfWorker(ctx context.Context, workerUUID string) ([]persistence.WorkerTag, error) // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index a5934474..6cca0d26 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -44,7 +44,7 @@ func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder { } // AddWorkerToJobBlocklist mocks base method. -func (m *MockPersistenceService) AddWorkerToJobBlocklist(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) error { +func (m *MockPersistenceService) AddWorkerToJobBlocklist(arg0 context.Context, arg1 *persistence.Job, arg2 *sqlc.Worker, arg3 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddWorkerToJobBlocklist", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) @@ -58,7 +58,7 @@ func (mr *MockPersistenceServiceMockRecorder) AddWorkerToJobBlocklist(arg0, arg1 } // AddWorkerToTaskFailedList mocks base method. -func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *persistence.Worker) (int, error) { +func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *sqlc.Worker) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddWorkerToTaskFailedList", arg0, arg1, arg2) ret0, _ := ret[0].(int) @@ -115,7 +115,7 @@ func (mr *MockPersistenceServiceMockRecorder) ClearJobBlocklist(arg0, arg1 inter } // CountTaskFailuresOfWorker mocks base method. -func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) (int, error) { +func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *sqlc.Worker, arg3 string) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CountTaskFailuresOfWorker", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(int) @@ -130,7 +130,7 @@ func (mr *MockPersistenceServiceMockRecorder) CountTaskFailuresOfWorker(arg0, ar } // CreateWorker mocks base method. -func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateWorker", arg0, arg1) ret0, _ := ret[0].(error) @@ -230,6 +230,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobs(arg0 interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchJobs", reflect.TypeOf((*MockPersistenceService)(nil).FetchJobs), arg0) } +// FetchTagsOfWorker mocks base method. +func (m *MockPersistenceService) FetchTagsOfWorker(arg0 context.Context, arg1 string) ([]persistence.WorkerTag, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchTagsOfWorker", arg0, arg1) + ret0, _ := ret[0].([]persistence.WorkerTag) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchTagsOfWorker indicates an expected call of FetchTagsOfWorker. +func (mr *MockPersistenceServiceMockRecorder) FetchTagsOfWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTagsOfWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchTagsOfWorker), arg0, arg1) +} + // FetchTask mocks base method. func (m *MockPersistenceService) FetchTask(arg0 context.Context, arg1 string) (*persistence.Task, error) { m.ctrl.T.Helper() @@ -246,10 +261,10 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTask(arg0, arg1 interface{}) } // FetchTaskFailureList mocks base method. -func (m *MockPersistenceService) FetchTaskFailureList(arg0 context.Context, arg1 *persistence.Task) ([]*persistence.Worker, error) { +func (m *MockPersistenceService) FetchTaskFailureList(arg0 context.Context, arg1 *persistence.Task) ([]*sqlc.Worker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchTaskFailureList", arg0, arg1) - ret0, _ := ret[0].([]*persistence.Worker) + ret0, _ := ret[0].([]*sqlc.Worker) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -276,10 +291,10 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTaskJobUUID(arg0, arg1 interf } // FetchWorker mocks base method. -func (m *MockPersistenceService) FetchWorker(arg0 context.Context, arg1 string) (*persistence.Worker, error) { +func (m *MockPersistenceService) FetchWorker(arg0 context.Context, arg1 string) (*sqlc.Worker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchWorker", arg0, arg1) - ret0, _ := ret[0].(*persistence.Worker) + ret0, _ := ret[0].(*sqlc.Worker) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -321,7 +336,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTags(arg0 interface{}) } // FetchWorkerTask mocks base method. -func (m *MockPersistenceService) FetchWorkerTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) { +func (m *MockPersistenceService) FetchWorkerTask(arg0 context.Context, arg1 *sqlc.Worker) (*persistence.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchWorkerTask", arg0, arg1) ret0, _ := ret[0].(*persistence.Task) @@ -336,10 +351,10 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTask(arg0, arg1 interfa } // FetchWorkers mocks base method. -func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*persistence.Worker, error) { +func (m *MockPersistenceService) FetchWorkers(arg0 context.Context) ([]*sqlc.Worker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchWorkers", arg0) - ret0, _ := ret[0].([]*persistence.Worker) + ret0, _ := ret[0].([]*sqlc.Worker) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -423,7 +438,7 @@ func (mr *MockPersistenceServiceMockRecorder) SaveTaskActivity(arg0, arg1 interf } // SaveWorker mocks base method. -func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveWorker", arg0, arg1) ret0, _ := ret[0].(error) @@ -437,7 +452,7 @@ func (mr *MockPersistenceServiceMockRecorder) SaveWorker(arg0, arg1 interface{}) } // SaveWorkerStatus mocks base method. -func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveWorkerStatus", arg0, arg1) ret0, _ := ret[0].(error) @@ -465,7 +480,7 @@ func (mr *MockPersistenceServiceMockRecorder) SaveWorkerTag(arg0, arg1 interface } // ScheduleTask mocks base method. -func (m *MockPersistenceService) ScheduleTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) { +func (m *MockPersistenceService) ScheduleTask(arg0 context.Context, arg1 *sqlc.Worker) (*persistence.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ScheduleTask", arg0, arg1) ret0, _ := ret[0].(*persistence.Task) @@ -522,7 +537,7 @@ func (mr *MockPersistenceServiceMockRecorder) TaskTouchedByWorker(arg0, arg1 int } // WorkerSeen mocks base method. -func (m *MockPersistenceService) WorkerSeen(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) WorkerSeen(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WorkerSeen", arg0, arg1) ret0, _ := ret[0].(error) @@ -536,7 +551,7 @@ func (mr *MockPersistenceServiceMockRecorder) WorkerSeen(arg0, arg1 interface{}) } // WorkerSetTags mocks base method. -func (m *MockPersistenceService) WorkerSetTags(arg0 context.Context, arg1 *persistence.Worker, arg2 []string) error { +func (m *MockPersistenceService) WorkerSetTags(arg0 context.Context, arg1 *sqlc.Worker, arg2 []string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WorkerSetTags", arg0, arg1, arg2) ret0, _ := ret[0].(error) @@ -1017,7 +1032,7 @@ func (mr *MockTaskStateMachineMockRecorder) JobStatusChange(arg0, arg1, arg2, ar } // RequeueActiveTasksOfWorker mocks base method. -func (m *MockTaskStateMachine) RequeueActiveTasksOfWorker(arg0 context.Context, arg1 *persistence.Worker, arg2 string) error { +func (m *MockTaskStateMachine) RequeueActiveTasksOfWorker(arg0 context.Context, arg1 *sqlc.Worker, arg2 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RequeueActiveTasksOfWorker", arg0, arg1, arg2) ret0, _ := ret[0].(error) @@ -1031,7 +1046,7 @@ func (mr *MockTaskStateMachineMockRecorder) RequeueActiveTasksOfWorker(arg0, arg } // RequeueFailedTasksOfWorkerOfJob mocks base method. -func (m *MockTaskStateMachine) RequeueFailedTasksOfWorkerOfJob(arg0 context.Context, arg1 *persistence.Worker, arg2 *persistence.Job, arg3 string) error { +func (m *MockTaskStateMachine) RequeueFailedTasksOfWorkerOfJob(arg0 context.Context, arg1 *sqlc.Worker, arg2 *persistence.Job, arg3 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RequeueFailedTasksOfWorkerOfJob", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) diff --git a/internal/manager/api_impl/support_test.go b/internal/manager/api_impl/support_test.go index 23c46553..56592989 100644 --- a/internal/manager/api_impl/support_test.go +++ b/internal/manager/api_impl/support_test.go @@ -236,7 +236,7 @@ func assertResponseNoBody(t *testing.T, echoCtx echo.Context, expectStatus int) func testWorker() persistence.Worker { return persistence.Worker{ - Model: persistence.Model{ID: 1}, + ID: 1, UUID: "e7632d62-c3b8-4af0-9e78-01752928952c", Name: "дрон", Address: "fe80::5054:ff:fede:2ad7", diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index 564a4a9b..de185aac 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -57,6 +57,15 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err) } + workerTags, err := f.persist.FetchTagsOfWorker(ctx, workerUUID) + switch { + case errors.Is(err, context.Canceled): + return handleConnectionClosed(e, logger, "fetching worker tags") + case err != nil: + logger.Error().AnErr("cause", err).Msg("fetching worker tags") + return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tags: %v", err) + } + dbTask, err := f.persist.FetchWorkerTask(ctx, dbWorker) switch { case errors.Is(err, context.Canceled): @@ -77,6 +86,20 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error { apiWorker.Task = &apiWorkerTask } + // TODO: see which API calls actually need the worker tags, and whether it's + // better to just call a specific API operation in those cases. + if len(workerTags) > 0 { + apiTags := make([]api.WorkerTag, len(workerTags)) + for index, tag := range workerTags { + apiTags[index].Id = &tag.UUID + apiTags[index].Name = tag.Name + if tag.Description != "" { + apiTags[index].Description = &tag.Description + } + } + apiWorker.Tags = &apiTags + } + return e.JSON(http.StatusOK, apiWorker) } @@ -464,8 +487,8 @@ func workerSummary(w persistence.Worker) api.WorkerSummary { } } - if !w.LastSeenAt.IsZero() { - summary.LastSeen = &w.LastSeenAt + if w.LastSeenAt.Valid { + summary.LastSeen = &w.LastSeenAt.Time } return summary @@ -478,15 +501,6 @@ func workerDBtoAPI(w persistence.Worker) api.Worker { Platform: w.Platform, SupportedTaskTypes: w.TaskTypes(), } - - if len(w.Tags) > 0 { - tags := []api.WorkerTag{} - for i := range w.Tags { - tags = append(tags, *workerTagDBtoAPI(w.Tags[i])) - } - apiWorker.Tags = &tags - } - return apiWorker } diff --git a/internal/manager/api_impl/worker_mgt_test.go b/internal/manager/api_impl/worker_mgt_test.go index 62942ee3..366f7d9f 100644 --- a/internal/manager/api_impl/worker_mgt_test.go +++ b/internal/manager/api_impl/worker_mgt_test.go @@ -85,6 +85,15 @@ func TestFetchWorker(t *testing.T) { require.NoError(t, err) assertResponseAPIError(t, echo, http.StatusInternalServerError, "error fetching worker: some unknown error") + // Test database error fetching worker tags. + mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil) + mf.persistence.EXPECT().FetchTagsOfWorker(gomock.Any(), workerUUID). + Return(nil, errors.New("some tag fetching error")) + echo = mf.prepareMockedRequest(nil) + err = mf.flamenco.FetchWorker(echo, workerUUID) + require.NoError(t, err) + assertResponseAPIError(t, echo, http.StatusInternalServerError, "error fetching worker tags: some tag fetching error") + // Test with worker that does NOT have a status change requested, and DOES have an assigned task. mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil) assignedTask := persistence.Task{ @@ -93,6 +102,10 @@ func TestFetchWorker(t *testing.T) { Job: &persistence.Job{UUID: "f0e25ee4-0d13-4291-afc3-e9446b555aaf"}, Status: api.TaskStatusActive, } + mf.persistence.EXPECT().FetchTagsOfWorker(gomock.Any(), workerUUID).Return([]persistence.WorkerTag{ + {UUID: "0e701402-c4cc-49b0-8b8c-3eb8718d463a", Name: "EEVEE"}, + {UUID: "59211f0a-81cc-4148-b0b7-32b3e2dcdb8f", Name: "Cycles"}, + }, nil) mf.persistence.EXPECT().FetchWorkerTask(gomock.Any(), &worker).Return(&assignedTask, nil) echo = mf.prepareMockedRequest(nil) @@ -116,12 +129,17 @@ func TestFetchWorker(t *testing.T) { }, JobId: assignedTask.Job.UUID, }, + Tags: &[]api.WorkerTag{ + {Id: ptr("0e701402-c4cc-49b0-8b8c-3eb8718d463a"), Name: "EEVEE"}, + {Id: ptr("59211f0a-81cc-4148-b0b7-32b3e2dcdb8f"), Name: "Cycles"}, + }, }) // Test with worker that does have a status change requested, but does NOT Have an assigned task. requestedStatus := api.WorkerStatusAsleep worker.StatusChangeRequest(requestedStatus, false) mf.persistence.EXPECT().FetchWorker(gomock.Any(), workerUUID).Return(&worker, nil) + mf.persistence.EXPECT().FetchTagsOfWorker(gomock.Any(), workerUUID).Return([]persistence.WorkerTag{}, nil) mf.persistence.EXPECT().FetchWorkerTask(gomock.Any(), &worker).Return(nil, nil) echo = mf.prepareMockedRequest(nil) @@ -170,7 +188,7 @@ func TestDeleteWorker(t *testing.T) { Id: worker.UUID, Name: worker.Name, Status: worker.Status, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -201,7 +219,7 @@ func TestRequestWorkerStatusChange(t *testing.T) { Id: worker.UUID, Name: worker.Name, Status: prevStatus, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, StatusChange: &api.WorkerStatusChangeRequest{ Status: requestStatus, @@ -245,7 +263,7 @@ func TestRequestWorkerStatusChangeRevert(t *testing.T) { Id: worker.UUID, Name: worker.Name, Status: currentStatus, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, StatusChange: nil, }) diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go index 0f7d5c05..7d132386 100644 --- a/internal/manager/api_impl/worker_task_updates.go +++ b/internal/manager/api_impl/worker_task_updates.go @@ -52,7 +52,7 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { Msg("worker trying to update task that's not assigned to any worker") return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID) } - if *dbTask.WorkerID != worker.ID { + if *dbTask.WorkerID != uint(worker.ID) { logger.Warn().Msg("worker trying to update task that's assigned to another worker") return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID) } diff --git a/internal/manager/api_impl/worker_task_updates_test.go b/internal/manager/api_impl/worker_task_updates_test.go index 6810bc7a..0cc6fd2d 100644 --- a/internal/manager/api_impl/worker_task_updates_test.go +++ b/internal/manager/api_impl/worker_task_updates_test.go @@ -36,7 +36,7 @@ func TestTaskUpdate(t *testing.T) { mockTask := persistence.Task{ UUID: taskID, Worker: &worker, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Job: &mockJob, Activity: "pre-update activity", } @@ -105,7 +105,7 @@ func TestTaskUpdateFailed(t *testing.T) { mockTask := persistence.Task{ UUID: taskID, Worker: &worker, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Job: &mockJob, Activity: "pre-update activity", Type: "misc", @@ -189,7 +189,7 @@ func TestBlockingAfterFailure(t *testing.T) { mockTask := persistence.Task{ UUID: taskID, Worker: &worker, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Job: &mockJob, Activity: "pre-update activity", Type: "misc", @@ -339,7 +339,7 @@ func TestJobFailureAfterWorkerTaskFailure(t *testing.T) { mockTask := persistence.Task{ UUID: taskID, Worker: &worker, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Job: &mockJob, Activity: "pre-update activity", Type: "misc", diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 8e77545a..1feaae1e 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -582,7 +582,7 @@ func mayWorkerRun(worker *persistence.Worker, dbTask *persistence.Task) api.MayK StatusChangeRequested: true, } } - if dbTask.WorkerID == nil || *dbTask.WorkerID != worker.ID { + if dbTask.WorkerID == nil || *dbTask.WorkerID != uint(worker.ID) { return api.MayKeepRunning{Reason: "task not assigned to this worker"} } if !task_state_machine.IsRunnableTaskStatus(dbTask.Status) { diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 6340441a..cee31635 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -193,7 +193,7 @@ func TestWorkerSignOn(t *testing.T) { Name: "Lazy Boi", PreviousStatus: &prevStatus, Status: api.WorkerStatusStarting, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: "3.0-testing", }) @@ -249,7 +249,7 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { IsLazy: false, Status: api.WorkerStatusAwake, }, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -278,7 +278,7 @@ func TestWorkerRememberPreviousStatus(t *testing.T) { IsLazy: false, Status: api.WorkerStatusAwake, }, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -316,7 +316,7 @@ func TestWorkerDontRememberPreviousStatus(t *testing.T) { PreviousStatus: ptr(api.WorkerStatusError), Status: api.WorkerStatusOffline, StatusChange: nil, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -383,7 +383,7 @@ func TestWorkerStateChanged(t *testing.T) { Name: worker.Name, PreviousStatus: &prevStatus, Status: api.WorkerStatusAsleep, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -423,7 +423,7 @@ func TestWorkerStateChangedAfterChangeRequest(t *testing.T) { Name: worker.Name, PreviousStatus: ptr(api.WorkerStatusOffline), Status: api.WorkerStatusStarting, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, StatusChange: &api.WorkerStatusChangeRequest{ Status: api.WorkerStatusAsleep, @@ -456,7 +456,7 @@ func TestWorkerStateChangedAfterChangeRequest(t *testing.T) { Name: worker.Name, PreviousStatus: ptr(api.WorkerStatusStarting), Status: api.WorkerStatusAsleep, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) @@ -526,7 +526,7 @@ func TestMayWorkerRun(t *testing.T) { mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &task).Return(nil) echo := prepareRequest() - task.WorkerID = &worker.ID + task.WorkerID = ptr(uint(worker.ID)) err := mf.flamenco.MayWorkerRun(echo, task.UUID) require.NoError(t, err) assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{ @@ -537,7 +537,7 @@ func TestMayWorkerRun(t *testing.T) { // Test: unhappy, assigned but cancelled. { echo := prepareRequest() - task.WorkerID = &worker.ID + task.WorkerID = ptr(uint(worker.ID)) task.Status = api.TaskStatusCanceled err := mf.flamenco.MayWorkerRun(echo, task.UUID) require.NoError(t, err) @@ -551,7 +551,7 @@ func TestMayWorkerRun(t *testing.T) { { worker.StatusChangeRequest(api.WorkerStatusAsleep, false) echo := prepareRequest() - task.WorkerID = &worker.ID + task.WorkerID = ptr(uint(worker.ID)) task.Status = api.TaskStatusActive err := mf.flamenco.MayWorkerRun(echo, task.UUID) require.NoError(t, err) @@ -569,7 +569,7 @@ func TestMayWorkerRun(t *testing.T) { worker.StatusChangeRequest(api.WorkerStatusAsleep, true) echo := prepareRequest() - task.WorkerID = &worker.ID + task.WorkerID = ptr(uint(worker.ID)) task.Status = api.TaskStatusActive err := mf.flamenco.MayWorkerRun(echo, task.UUID) require.NoError(t, err) diff --git a/internal/manager/eventbus/events_workers.go b/internal/manager/eventbus/events_workers.go index 94fd736a..f36a0a0c 100644 --- a/internal/manager/eventbus/events_workers.go +++ b/internal/manager/eventbus/events_workers.go @@ -18,7 +18,7 @@ func NewWorkerUpdate(worker *persistence.Worker) api.EventWorkerUpdate { Name: worker.Name, Status: worker.Status, Version: worker.Software, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, CanRestart: worker.CanRestart, } @@ -29,8 +29,8 @@ func NewWorkerUpdate(worker *persistence.Worker) api.EventWorkerUpdate { } } - if !worker.LastSeenAt.IsZero() { - workerUpdate.LastSeen = &worker.LastSeenAt + if worker.LastSeenAt.Valid { + workerUpdate.LastSeen = &worker.LastSeenAt.Time } // TODO: add tag IDs. diff --git a/internal/manager/persistence/conversion.go b/internal/manager/persistence/conversion.go new file mode 100644 index 00000000..4f1e82ee --- /dev/null +++ b/internal/manager/persistence/conversion.go @@ -0,0 +1,16 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import "database/sql" + +func nullTimeToUTC(t sql.NullTime) sql.NullTime { + return sql.NullTime{ + Time: t.Time.UTC(), + Valid: t.Valid, + } +} + +func ptr[T any](value T) *T { + return &value +} diff --git a/internal/manager/persistence/conversion_test.go b/internal/manager/persistence/conversion_test.go new file mode 100644 index 00000000..e35a79d3 --- /dev/null +++ b/internal/manager/persistence/conversion_test.go @@ -0,0 +1,44 @@ +package persistence + +import ( + "database/sql" + "reflect" + "testing" +) + +func Test_nullTimeToUTC(t *testing.T) { + + inUTC := mustParseTime("2024-11-11T20:12:47Z") + inBangkok := mustParseTime("2024-11-12T03:12:47+07:00") + + tests := []struct { + name string + arg sql.NullTime + want sql.NullTime + }{ + {"zero", sql.NullTime{}, sql.NullTime{}}, + {"invalid-nonzero-utc", + sql.NullTime{Time: inUTC, Valid: false}, + sql.NullTime{Time: inUTC, Valid: false}, + }, + {"valid-nonzero-utc", + sql.NullTime{Time: inUTC, Valid: true}, + sql.NullTime{Time: inUTC, Valid: true}, + }, + {"invalid-nonzero-bangkok", + sql.NullTime{Time: inBangkok, Valid: false}, + sql.NullTime{Time: inUTC, Valid: false}, + }, + {"valid-nonzero-bangkok", + sql.NullTime{Time: inBangkok, Valid: true}, + sql.NullTime{Time: inUTC, Valid: true}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := nullTimeToUTC(tt.arg); !reflect.DeepEqual(got, tt.want) { + t.Errorf("nullTimeToUTC() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/manager/persistence/errors.go b/internal/manager/persistence/errors.go index ff94a5c4..2758f1eb 100644 --- a/internal/manager/persistence/errors.go +++ b/internal/manager/persistence/errors.go @@ -37,19 +37,19 @@ func (e PersistenceError) Is(err error) bool { } func jobError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormJobError(errorToWrap), message, msgArgs...) + return wrapError(translateJobError(errorToWrap), message, msgArgs...) } func taskError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormTaskError(errorToWrap), message, msgArgs...) + return wrapError(translateTaskError(errorToWrap), message, msgArgs...) } func workerError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormWorkerError(errorToWrap), message, msgArgs...) + return wrapError(translateWorkerError(errorToWrap), message, msgArgs...) } func workerTagError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormWorkerTagError(errorToWrap), message, msgArgs...) + return wrapError(translateWorkerTagError(errorToWrap), message, msgArgs...) } func wrapError(errorToWrap error, message string, format ...interface{}) error { @@ -73,36 +73,36 @@ func wrapError(errorToWrap error, message string, format ...interface{}) error { } } -// translateGormJobError translates a Gorm error to a persistence layer error. +// translateJobError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormJobError(err error) error { +func translateJobError(err error) error { if errors.Is(err, sql.ErrNoRows) { return ErrJobNotFound } return err } -// translateGormTaskError translates a Gorm error to a persistence layer error. +// translateTaskError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormTaskError(err error) error { +func translateTaskError(err error) error { if errors.Is(err, sql.ErrNoRows) { return ErrTaskNotFound } return err } -// translateGormWorkerError translates a Gorm error to a persistence layer error. +// translateWorkerError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerError(err error) error { +func translateWorkerError(err error) error { if errors.Is(err, sql.ErrNoRows) { return ErrWorkerNotFound } return err } -// translateGormWorkerTagError translates a Gorm error to a persistence layer error. +// translateWorkerTagError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerTagError(err error) error { +func translateWorkerTagError(err error) error { if errors.Is(err, sql.ErrNoRows) { return ErrWorkerTagNotFound } diff --git a/internal/manager/persistence/errors_test.go b/internal/manager/persistence/errors_test.go index 3a6c3c69..9be7d537 100644 --- a/internal/manager/persistence/errors_test.go +++ b/internal/manager/persistence/errors_test.go @@ -17,18 +17,18 @@ func TestNotFoundErrors(t *testing.T) { assert.Contains(t, ErrTaskNotFound.Error(), "task") } -func TestTranslateGormJobError(t *testing.T) { - assert.Nil(t, translateGormJobError(nil)) - assert.Equal(t, ErrJobNotFound, translateGormJobError(sql.ErrNoRows)) +func TestTranslateJobError(t *testing.T) { + assert.Nil(t, translateJobError(nil)) + assert.Equal(t, ErrJobNotFound, translateJobError(sql.ErrNoRows)) otherError := errors.New("this error is not special for this function") - assert.Equal(t, otherError, translateGormJobError(otherError)) + assert.Equal(t, otherError, translateJobError(otherError)) } -func TestTranslateGormTaskError(t *testing.T) { - assert.Nil(t, translateGormTaskError(nil)) - assert.Equal(t, ErrTaskNotFound, translateGormTaskError(sql.ErrNoRows)) +func TestTranslateTaskError(t *testing.T) { + assert.Nil(t, translateTaskError(nil)) + assert.Equal(t, ErrTaskNotFound, translateTaskError(sql.ErrNoRows)) otherError := errors.New("this error is not special for this function") - assert.Equal(t, otherError, translateGormTaskError(otherError)) + assert.Equal(t, otherError, translateTaskError(otherError)) } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index c8eb9e6f..262bab7c 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -161,7 +161,7 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au Name: authoredJob.Name, JobType: authoredJob.JobType, Priority: int64(authoredJob.Priority), - Status: string(authoredJob.Status), + Status: authoredJob.Status, Settings: settings, Metadata: metadata, StorageShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID, @@ -182,7 +182,7 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au Str("job", params.UUID). Str("type", params.JobType). Str("name", params.Name). - Str("status", params.Status). + Str("status", string(params.Status)). Msg("persistence: storing authored job") jobID, err := qtx.queries.CreateJob(ctx, params) @@ -261,7 +261,7 @@ func (db *DB) storeAuthoredJobTaks( JobID: jobID, IndexInJob: int64(taskIndex + 1), // indexInJob is base-1. Priority: int64(authoredTask.Priority), - Status: string(api.TaskStatusQueued), + Status: api.TaskStatusQueued, Commands: commandsJSON, // dependencies are stored below. } @@ -492,12 +492,7 @@ func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) { queries := db.queries() - statuses := []string{} - for _, status := range jobStatuses { - statuses = append(statuses, string(status)) - } - - sqlcJobs, err := queries.FetchJobsInStatus(ctx, statuses) + sqlcJobs, err := queries.FetchJobsInStatus(ctx, jobStatuses) if err != nil { return nil, jobError(err, "fetching jobs in status %q", jobStatuses) } @@ -521,7 +516,7 @@ func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { params := sqlc.SaveJobStatusParams{ Now: db.nowNullable(), ID: int64(j.ID), - Status: string(j.Status), + Status: j.Status, Activity: j.Activity, } @@ -586,8 +581,9 @@ func convertSqlTaskWithJobAndWorker( task sqlc.Task, ) (*Task, error) { var ( - gormJob Job - gormWorker Worker + gormJob Job + worker Worker + err error ) // Fetch & convert the Job. @@ -603,17 +599,16 @@ func convertSqlTaskWithJobAndWorker( } } - // Fetch & convert the Worker. + // Fetch the Worker. if task.WorkerID.Valid && task.WorkerID.Int64 > 0 { - sqlcWorker, err := queries.FetchWorkerUnconditionalByID(ctx, task.WorkerID.Int64) + worker, err = queries.FetchWorkerUnconditionalByID(ctx, task.WorkerID.Int64) if err != nil { return nil, taskError(err, "fetching worker assigned to task %s", task.UUID) } - gormWorker = *convertSqlcWorker(sqlcWorker) } // Convert the Task. - gormTask, err := convertSqlcTask(task, gormJob.UUID, gormWorker.UUID) + gormTask, err := convertSqlcTask(task, gormJob.UUID, worker.UUID) if err != nil { return nil, err } @@ -623,9 +618,9 @@ func convertSqlTaskWithJobAndWorker( gormTask.Job = &gormJob gormTask.JobUUID = gormJob.UUID } - if gormWorker.ID > 0 { - gormTask.Worker = &gormWorker - gormTask.WorkerUUID = gormWorker.UUID + if worker.ID > 0 { + gormTask.Worker = &worker + gormTask.WorkerUUID = worker.UUID } return gormTask, nil @@ -664,7 +659,7 @@ func (db *DB) SaveTask(ctx context.Context, t *Task) error { Name: t.Name, Type: t.Type, Priority: int64(t.Priority), - Status: string(t.Status), + Status: t.Status, Commands: commandsJSON, Activity: t.Activity, ID: int64(t.ID), @@ -700,7 +695,7 @@ func (db *DB) SaveTaskStatus(ctx context.Context, t *Task) error { err := queries.UpdateTaskStatus(ctx, sqlc.UpdateTaskStatusParams{ UpdatedAt: db.nowNullable(), - Status: string(t.Status), + Status: t.Status, ID: int64(t.ID), }) if err != nil { @@ -743,7 +738,7 @@ func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error // Update the task itself. t.Worker = w - t.WorkerID = &w.ID + t.WorkerID = ptr(uint(w.ID)) return nil } @@ -756,7 +751,7 @@ func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, ta Int64: int64(worker.ID), Valid: true, }, - TaskStatus: string(taskStatus), + TaskStatus: taskStatus, }) if err != nil { return nil, taskError(err, "finding tasks of worker %s in status %q", worker.UUID, taskStatus) @@ -772,7 +767,7 @@ func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, ta return nil, err } gormTask.Worker = worker - gormTask.WorkerID = &worker.ID + gormTask.WorkerID = ptr(uint(worker.ID)) // Fetch the job, either from the cache or from the database. This is done // here because the task_state_machine functionality expects that task.Job @@ -802,7 +797,7 @@ func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worke Valid: true, }, JobID: int64(job.ID), - TaskStatus: string(taskStatus), + TaskStatus: taskStatus, }) if err != nil { return nil, taskError(err, "finding tasks of worker %s in status %q and job %s", worker.UUID, taskStatus, job.UUID) @@ -817,7 +812,7 @@ func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worke gormTask.Job = job gormTask.JobID = job.ID gormTask.Worker = worker - gormTask.WorkerID = &worker.ID + gormTask.WorkerID = ptr(uint(worker.ID)) result[i] = gormTask } return result, nil @@ -828,7 +823,7 @@ func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api. count, err := queries.JobCountTasksInStatus(ctx, sqlc.JobCountTasksInStatusParams{ JobID: int64(job.ID), - TaskStatus: string(taskStatus), + TaskStatus: taskStatus, }) if err != nil { return false, taskError(err, "counting tasks of job %s in status %q", job.UUID, taskStatus) @@ -896,7 +891,7 @@ func (db *DB) FetchTasksOfJobInStatus(ctx context.Context, job *Job, taskStatuse rows, err := queries.FetchTasksOfJobInStatus(ctx, sqlc.FetchTasksOfJobInStatusParams{ JobID: int64(job.ID), - TaskStatus: convertTaskStatuses(taskStatuses), + TaskStatus: taskStatuses, }) if err != nil { return nil, taskError(err, "fetching tasks of job %s in status %q", job.UUID, taskStatuses) @@ -926,7 +921,7 @@ func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job, err := queries.UpdateJobsTaskStatuses(ctx, sqlc.UpdateJobsTaskStatusesParams{ UpdatedAt: db.nowNullable(), - Status: string(taskStatus), + Status: taskStatus, Activity: activity, JobID: int64(job.ID), }) @@ -950,10 +945,10 @@ func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job, err := queries.UpdateJobsTaskStatusesConditional(ctx, sqlc.UpdateJobsTaskStatusesConditionalParams{ UpdatedAt: db.nowNullable(), - Status: string(taskStatus), + Status: taskStatus, Activity: activity, JobID: int64(job.ID), - StatusesToUpdate: convertTaskStatuses(statusesToUpdate), + StatusesToUpdate: statusesToUpdate, }) if err != nil { @@ -1041,7 +1036,7 @@ func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, err workers := make([]*Worker, len(failureList)) for idx := range failureList { - workers[idx] = convertSqlcWorker(failureList[idx].Worker) + workers[idx] = &failureList[idx].Worker } return workers, nil } @@ -1124,21 +1119,3 @@ func convertSqlcTask(task sqlc.Task, jobUUID string, workerUUID string) (*Task, return &dbTask, nil } - -// convertTaskStatuses converts from []api.TaskStatus to []string for feeding to sqlc. -func convertTaskStatuses(taskStatuses []api.TaskStatus) []string { - statusesAsStrings := make([]string, len(taskStatuses)) - for index := range taskStatuses { - statusesAsStrings[index] = string(taskStatuses[index]) - } - return statusesAsStrings -} - -// convertJobStatuses converts from []api.JobStatus to []string for feeding to sqlc. -func convertJobStatuses(jobStatuses []api.JobStatus) []string { - statusesAsStrings := make([]string, len(jobStatuses)) - for index := range jobStatuses { - statusesAsStrings[index] = string(jobStatuses[index]) - } - return statusesAsStrings -} diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go index bf30d17d..35985933 100644 --- a/internal/manager/persistence/jobs_blocklist_test.go +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -119,7 +119,9 @@ func TestWorkersLeftToRun(t *testing.T) { require.NoError(t, err) assert.Empty(t, left) - worker1 := createWorker(ctx, t, db) + worker1 := createWorker(ctx, t, db, func(w *Worker) { + w.UUID = "11111111-0000-1111-2222-333333333333" + }) worker2 := createWorkerFrom(ctx, t, db, *worker1) // Create one worker tag. It will not be used by this job, but one of the @@ -129,8 +131,8 @@ func TestWorkersLeftToRun(t *testing.T) { require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) workerC1 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c1c1c1c1-0000-1111-2222-333333333333" - w.Tags = []*WorkerTag{&tag1} }) + require.NoError(t, db.WorkerSetTags(ctx, workerC1, []string{tag1.UUID})) uuidMap := func(workers ...*Worker) map[string]bool { theMap := map[string]bool{} @@ -185,23 +187,25 @@ func TestWorkersLeftToRunWithTags(t *testing.T) { // Tags 1 + 3 workerC13 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c13c1313-0000-1111-2222-333333333333" - w.Tags = []*WorkerTag{&tag1, &tag3} }) + require.NoError(t, db.WorkerSetTags(ctx, workerC13, []string{tag1.UUID, tag3.UUID})) + // Tag 1 workerC1 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c1c1c1c1-0000-1111-2222-333333333333" - w.Tags = []*WorkerTag{&tag1} }) + require.NoError(t, db.WorkerSetTags(ctx, workerC1, []string{tag1.UUID})) + // Tag 2 worker, this one should never appear. - createWorker(ctx, t, db, func(w *Worker) { + workerC2 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c2c2c2c2-0000-1111-2222-333333333333" - w.Tags = []*WorkerTag{&tag2} }) + require.NoError(t, db.WorkerSetTags(ctx, workerC2, []string{tag2.UUID})) + // No tags, so should be able to run only tagless jobs. Which is none // in this test. createWorker(ctx, t, db, func(w *Worker) { w.UUID = "00000000-0000-1111-2222-333333333333" - w.Tags = nil }) uuidMap := func(workers ...*Worker) map[string]bool { diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index e7d0d9ba..06779b95 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -14,6 +14,7 @@ import ( "golang.org/x/net/context" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -581,7 +582,7 @@ func TestTaskAssignToWorker(t *testing.T) { if task.WorkerID == nil { t.Error("task.WorkerID == nil") } else { - assert.Equal(t, w.ID, *task.WorkerID) + assert.Equal(t, w.ID, int64(*task.WorkerID)) } } @@ -713,7 +714,7 @@ func TestAddWorkerToTaskFailedList(t *testing.T) { newWorker.ID = 0 newWorker.UUID = "89ed2b02-b51b-4cd4-b44a-4a1c8d01db85" newWorker.Name = "Worker 2" - require.NoError(t, db.SaveWorker(ctx, &newWorker)) + require.NoError(t, db.CreateWorker(ctx, &newWorker)) worker2, err := db.FetchWorker(ctx, newWorker.UUID) require.NoError(t, err) @@ -752,7 +753,7 @@ func TestClearFailureListOfTask(t *testing.T) { newWorker.ID = 0 newWorker.UUID = "89ed2b02-b51b-4cd4-b44a-4a1c8d01db85" newWorker.Name = "Worker 2" - require.NoError(t, db.SaveWorker(ctx, &newWorker)) + require.NoError(t, db.CreateWorker(ctx, &newWorker)) worker2, err := db.FetchWorker(ctx, newWorker.UUID) require.NoError(t, err) @@ -1020,7 +1021,6 @@ func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*W Software: "3.0", Status: api.WorkerStatusAwake, SupportedTaskTypes: "blender,ffmpeg,file-management", - Tags: nil, } for _, updater := range updaters { @@ -1039,14 +1039,26 @@ func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*W // createWorkerFrom duplicates the given worker, ensuring new UUIDs. func createWorkerFrom(ctx context.Context, t *testing.T, db *DB, worker Worker) *Worker { - worker.ID = 0 - worker.UUID = uuid.New() - worker.Name += " (copy)" + newWorker := sqlc.Worker{ + UUID: uuid.New(), + Secret: worker.Secret, + Name: worker.Name + " (copy)", + Address: worker.Address, + Platform: worker.Platform, + Software: worker.Software, + Status: worker.Status, + LastSeenAt: nullTimeToUTC(worker.LastSeenAt), + StatusRequested: worker.StatusRequested, + LazyStatusRequest: worker.LazyStatusRequest, + SupportedTaskTypes: worker.SupportedTaskTypes, + DeletedAt: worker.DeletedAt, + CanRestart: worker.CanRestart, + } - err := db.SaveWorker(ctx, &worker) + err := db.CreateWorker(ctx, &newWorker) require.NoError(t, err) - dbWorker, err := db.FetchWorker(ctx, worker.UUID) + dbWorker, err := db.FetchWorker(ctx, newWorker.UUID) require.NoError(t, err) return dbWorker diff --git a/internal/manager/persistence/sqlc/methods.go b/internal/manager/persistence/sqlc/methods.go new file mode 100644 index 00000000..04bc96dd --- /dev/null +++ b/internal/manager/persistence/sqlc/methods.go @@ -0,0 +1,41 @@ +// Code MANUALLY written to extend the SQLC structs with some extra methods. + +package sqlc + +import ( + "fmt" + "strings" + + "projects.blender.org/studio/flamenco/pkg/api" +) + +// SPDX-License-Identifier: GPL-3.0-or-later + +func (w *Worker) Identifier() string { + // Avoid a panic when worker.Identifier() is called on a nil pointer. + if w == nil { + return "-nil worker-" + } + return fmt.Sprintf("%s (%s)", w.Name, w.UUID) +} + +// TaskTypes returns the worker's supported task types as list of strings. +func (w *Worker) TaskTypes() []string { + return strings.Split(w.SupportedTaskTypes, ",") +} + +// StatusChangeRequest stores a requested status change on the Worker. +// This just updates the Worker instance, but doesn't store the change in the +// database. +func (w *Worker) StatusChangeRequest(status api.WorkerStatus, isLazyRequest bool) { + w.StatusRequested = status + w.LazyStatusRequest = isLazyRequest +} + +// StatusChangeClear clears the requested status change of the Worker. +// This just updates the Worker instance, but doesn't store the change in the +// database. +func (w *Worker) StatusChangeClear() { + w.StatusRequested = "" + w.LazyStatusRequest = false +} diff --git a/internal/manager/persistence/sqlc/models.go b/internal/manager/persistence/sqlc/models.go index 3ddf9108..ccbe873e 100644 --- a/internal/manager/persistence/sqlc/models.go +++ b/internal/manager/persistence/sqlc/models.go @@ -8,6 +8,8 @@ import ( "database/sql" "encoding/json" "time" + + "projects.blender.org/studio/flamenco/pkg/api" ) type Job struct { @@ -18,7 +20,7 @@ type Job struct { Name string JobType string Priority int64 - Status string + Status api.JobStatus Activity string Settings json.RawMessage Metadata json.RawMessage @@ -64,7 +66,7 @@ type Task struct { JobID int64 IndexInJob int64 Priority int64 - Status string + Status api.TaskStatus WorkerID sql.NullInt64 LastTouchedAt sql.NullTime Commands json.RawMessage @@ -92,9 +94,9 @@ type Worker struct { Address string Platform string Software string - Status string + Status api.WorkerStatus LastSeenAt sql.NullTime - StatusRequested string + StatusRequested api.WorkerStatus LazyStatusRequest bool SupportedTaskTypes string DeletedAt sql.NullTime diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 65c35f46..8960398e 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -11,6 +11,8 @@ import ( "encoding/json" "strings" "time" + + "projects.blender.org/studio/flamenco/pkg/api" ) const addWorkerToJobBlocklist = `-- name: AddWorkerToJobBlocklist :exec @@ -156,7 +158,7 @@ type CreateJobParams struct { Name string JobType string Priority int64 - Status string + Status api.JobStatus Activity string Settings json.RawMessage Metadata json.RawMessage @@ -218,7 +220,7 @@ type CreateTaskParams struct { JobID int64 IndexInJob int64 Priority int64 - Status string + Status api.TaskStatus Commands json.RawMessage } @@ -464,7 +466,7 @@ const fetchJobsInStatus = `-- name: FetchJobsInStatus :many SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs WHERE status IN (/*SLICE:statuses*/?) ` -func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]Job, error) { +func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []api.JobStatus) ([]Job, error) { query := fetchJobsInStatus var queryParams []interface{} if len(statuses) > 0 { @@ -675,7 +677,7 @@ WHERE tasks.job_id = ?1 type FetchTasksOfJobInStatusParams struct { JobID int64 - TaskStatus []string + TaskStatus []api.TaskStatus } type FetchTasksOfJobInStatusRow struct { @@ -743,7 +745,7 @@ WHERE tasks.worker_id = ?1 type FetchTasksOfWorkerInStatusParams struct { WorkerID sql.NullInt64 - TaskStatus string + TaskStatus api.TaskStatus } type FetchTasksOfWorkerInStatusRow struct { @@ -801,7 +803,7 @@ WHERE tasks.worker_id = ?1 type FetchTasksOfWorkerInStatusOfJobParams struct { WorkerID sql.NullInt64 JobID int64 - TaskStatus string + TaskStatus api.TaskStatus } type FetchTasksOfWorkerInStatusOfJobRow struct { @@ -855,7 +857,7 @@ AND last_touched_at <= ?2 ` type FetchTimedOutTasksParams struct { - TaskStatus string + TaskStatus api.TaskStatus UntouchedSince sql.NullTime } @@ -916,7 +918,7 @@ GROUP BY status ` type JobCountTaskStatusesRow struct { - Status string + Status api.TaskStatus NumTasks int64 } @@ -951,7 +953,7 @@ WHERE job_id = ?1 AND status = ?2 type JobCountTasksInStatusParams struct { JobID int64 - TaskStatus string + TaskStatus api.TaskStatus } // Fetch number of tasks in the given status, of the given job. @@ -975,7 +977,7 @@ type QueryJobTaskSummariesRow struct { Name string IndexInJob int64 Priority int64 - Status string + Status api.TaskStatus Type string UpdatedAt sql.NullTime } @@ -1097,7 +1099,7 @@ UPDATE jobs SET updated_at=?1, status=?2, activity=?3 WHERE id=?4 type SaveJobStatusParams struct { Now sql.NullTime - Status string + Status api.JobStatus Activity string ID int64 } @@ -1170,7 +1172,7 @@ GROUP BY status ` type SummarizeJobStatusesRow struct { - Status string + Status api.JobStatus StatusCount int64 } @@ -1375,7 +1377,7 @@ WHERE job_id = ?4 type UpdateJobsTaskStatusesParams struct { UpdatedAt sql.NullTime - Status string + Status api.TaskStatus Activity string JobID int64 } @@ -1400,10 +1402,10 @@ WHERE job_id = ?4 AND status in (/*SLICE:statuses_to_update*/?) type UpdateJobsTaskStatusesConditionalParams struct { UpdatedAt sql.NullTime - Status string + Status api.TaskStatus Activity string JobID int64 - StatusesToUpdate []string + StatusesToUpdate []api.TaskStatus } func (q *Queries) UpdateJobsTaskStatusesConditional(ctx context.Context, arg UpdateJobsTaskStatusesConditionalParams) error { @@ -1444,7 +1446,7 @@ type UpdateTaskParams struct { Name string Type string Priority int64 - Status string + Status api.TaskStatus WorkerID sql.NullInt64 LastTouchedAt sql.NullTime Commands json.RawMessage @@ -1496,7 +1498,7 @@ WHERE id=?3 type UpdateTaskStatusParams struct { UpdatedAt sql.NullTime - Status string + Status api.TaskStatus ID int64 } diff --git a/internal/manager/persistence/sqlc/query_task_scheduler.sql.go b/internal/manager/persistence/sqlc/query_task_scheduler.sql.go index e0cfe922..7573ecb7 100644 --- a/internal/manager/persistence/sqlc/query_task_scheduler.sql.go +++ b/internal/manager/persistence/sqlc/query_task_scheduler.sql.go @@ -9,6 +9,8 @@ import ( "context" "database/sql" "strings" + + "projects.blender.org/studio/flamenco/pkg/api" ) const assignTaskToWorker = `-- name: AssignTaskToWorker :exec @@ -39,9 +41,9 @@ LIMIT 1 ` type FetchAssignedAndRunnableTaskOfWorkerParams struct { - ActiveTaskStatus string + ActiveTaskStatus api.TaskStatus WorkerID sql.NullInt64 - ActiveJobStatuses []string + ActiveJobStatuses []api.JobStatus } type FetchAssignedAndRunnableTaskOfWorkerRow struct { @@ -99,8 +101,8 @@ LIMIT 1 ` type FetchWorkerTaskParams struct { - TaskStatusActive string - JobStatusActive string + TaskStatusActive api.TaskStatus + JobStatusActive api.JobStatus WorkerID sql.NullInt64 } @@ -179,10 +181,10 @@ ORDER BY jobs.priority DESC, tasks.priority DESC type FindRunnableTaskParams struct { WorkerID int64 - TaskStatusCompleted string + TaskStatusCompleted api.TaskStatus WorkerTags []sql.NullInt64 - SchedulableTaskStatuses []string - SchedulableJobStatuses []string + SchedulableTaskStatuses []api.TaskStatus + SchedulableJobStatuses []api.JobStatus SupportedTaskTypes []string } diff --git a/internal/manager/persistence/sqlc/query_workers.sql b/internal/manager/persistence/sqlc/query_workers.sql index 60ddf639..2df38851 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql +++ b/internal/manager/persistence/sqlc/query_workers.sql @@ -43,7 +43,7 @@ FROM worker_tag_membership WHERE worker_id=@worker_id; -- name: FetchWorkers :many -SELECT sqlc.embed(workers) FROM workers +SELECT * FROM workers WHERE deleted_at IS NULL; -- name: FetchWorker :one diff --git a/internal/manager/persistence/sqlc/query_workers.sql.go b/internal/manager/persistence/sqlc/query_workers.sql.go index d6cbc97b..e9496859 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql.go +++ b/internal/manager/persistence/sqlc/query_workers.sql.go @@ -10,6 +10,8 @@ import ( "database/sql" "strings" "time" + + "projects.blender.org/studio/flamenco/pkg/api" ) const countWorkerTags = `-- name: CountWorkerTags :one @@ -66,9 +68,9 @@ type CreateWorkerParams struct { Address string Platform string Software string - Status string + Status api.WorkerStatus LastSeenAt sql.NullTime - StatusRequested string + StatusRequested api.WorkerStatus LazyStatusRequest bool SupportedTaskTypes string DeletedAt sql.NullTime @@ -232,7 +234,7 @@ AND status NOT IN (/*SLICE:worker_statuses_no_timeout*/?) type FetchTimedOutWorkersParams struct { LastSeenBefore sql.NullTime - WorkerStatusesNoTimeout []string + WorkerStatusesNoTimeout []api.WorkerStatus } func (q *Queries) FetchTimedOutWorkers(ctx context.Context, arg FetchTimedOutWorkersParams) ([]Worker, error) { @@ -548,40 +550,36 @@ func (q *Queries) FetchWorkerUnconditionalByID(ctx context.Context, workerID int } const fetchWorkers = `-- name: FetchWorkers :many -SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers +SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE deleted_at IS NULL ` -type FetchWorkersRow struct { - Worker Worker -} - -func (q *Queries) FetchWorkers(ctx context.Context) ([]FetchWorkersRow, error) { +func (q *Queries) FetchWorkers(ctx context.Context) ([]Worker, error) { rows, err := q.db.QueryContext(ctx, fetchWorkers) if err != nil { return nil, err } defer rows.Close() - var items []FetchWorkersRow + var items []Worker for rows.Next() { - var i FetchWorkersRow + var i Worker if err := rows.Scan( - &i.Worker.ID, - &i.Worker.CreatedAt, - &i.Worker.UpdatedAt, - &i.Worker.UUID, - &i.Worker.Secret, - &i.Worker.Name, - &i.Worker.Address, - &i.Worker.Platform, - &i.Worker.Software, - &i.Worker.Status, - &i.Worker.LastSeenAt, - &i.Worker.StatusRequested, - &i.Worker.LazyStatusRequest, - &i.Worker.SupportedTaskTypes, - &i.Worker.DeletedAt, - &i.Worker.CanRestart, + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Secret, + &i.Name, + &i.Address, + &i.Platform, + &i.Software, + &i.Status, + &i.LastSeenAt, + &i.StatusRequested, + &i.LazyStatusRequest, + &i.SupportedTaskTypes, + &i.DeletedAt, + &i.CanRestart, ); err != nil { return nil, err } @@ -622,9 +620,9 @@ type SaveWorkerParams struct { Address string Platform string Software string - Status string + Status api.WorkerStatus LastSeenAt sql.NullTime - StatusRequested string + StatusRequested api.WorkerStatus LazyStatusRequest bool SupportedTaskTypes string CanRestart bool @@ -662,8 +660,8 @@ WHERE id=?5 type SaveWorkerStatusParams struct { UpdatedAt sql.NullTime - Status string - StatusRequested string + Status api.WorkerStatus + StatusRequested api.WorkerStatus LazyStatusRequest bool ID int64 } @@ -808,7 +806,7 @@ GROUP BY status ` type SummarizeWorkerStatusesRow struct { - Status string + Status api.WorkerStatus StatusCount int64 } diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 9d5539b5..39903b0f 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -77,8 +77,8 @@ func (db *DB) scheduleTask(ctx context.Context, queries *sqlc.Queries, w *Worker // Worker, but since it's active that is unlikely. { row, err := queries.FetchAssignedAndRunnableTaskOfWorker(ctx, sqlc.FetchAssignedAndRunnableTaskOfWorkerParams{ - ActiveTaskStatus: string(api.TaskStatusActive), - ActiveJobStatuses: convertJobStatuses(schedulableJobStatuses), + ActiveTaskStatus: api.TaskStatusActive, + ActiveJobStatuses: schedulableJobStatuses, WorkerID: workerID, }) @@ -141,18 +141,22 @@ func findTaskForWorker( w *Worker, ) (sqlc.Task, error) { - // Construct the list of worker tags to check. - workerTags := make([]sql.NullInt64, len(w.Tags)) - for index, tag := range w.Tags { - workerTags[index] = sql.NullInt64{Int64: int64(tag.ID), Valid: true} + // Construct the list of worker tag IDs to check. + tags, err := queries.FetchTagsOfWorker(ctx, w.UUID) + if err != nil { + return sqlc.Task{}, err + } + workerTags := make([]sql.NullInt64, len(tags)) + for index, tag := range tags { + workerTags[index] = sql.NullInt64{Int64: tag.ID, Valid: true} } row, err := queries.FindRunnableTask(ctx, sqlc.FindRunnableTaskParams{ WorkerID: int64(w.ID), - SchedulableTaskStatuses: convertTaskStatuses(schedulableTaskStatuses), - SchedulableJobStatuses: convertJobStatuses(schedulableJobStatuses), + SchedulableTaskStatuses: schedulableTaskStatuses, + SchedulableJobStatuses: schedulableJobStatuses, SupportedTaskTypes: w.TaskTypes(), - TaskStatusCompleted: string(api.TaskStatusCompleted), + TaskStatusCompleted: api.TaskStatusCompleted, WorkerTags: workerTags, }) if err != nil { diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 381f7888..3e1b4035 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -48,7 +48,7 @@ func TestOneJobOneTask(t *testing.T) { require.NotNil(t, task) assert.Equal(t, job.ID, task.JobID) require.NotNil(t, task.WorkerID, "no worker assigned to returned task") - assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker") + assert.Equal(t, w.ID, int64(*task.WorkerID), "task must be assigned to the requesting worker") // Check the task in the database. now := db.now() @@ -57,7 +57,7 @@ func TestOneJobOneTask(t *testing.T) { require.NotNil(t, dbTask) require.NotNil(t, dbTask.WorkerID, "no worker assigned to task in database") - assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker") + assert.Equal(t, w.ID, int64(*dbTask.WorkerID), "task must be assigned to the requesting worker") assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second, "task must be 'touched' by the worker after scheduling") } @@ -259,7 +259,7 @@ func TestAlreadyAssigned(t *testing.T) { // another, higher-prio task to be done. dbTask3, err := db.FetchTask(ctx, att3.UUID) require.NoError(t, err) - dbTask3.WorkerID = &w.ID + dbTask3.WorkerID = ptr(uint(w.ID)) dbTask3.Status = api.TaskStatusActive err = db.SaveTask(ctx, dbTask3) require.NoError(t, err) @@ -292,7 +292,7 @@ func TestAssignedToOtherWorker(t *testing.T) { // it shouldn't matter which worker it's assigned to. dbTask2, err := db.FetchTask(ctx, att2.UUID) require.NoError(t, err) - dbTask2.WorkerID = &w2.ID + dbTask2.WorkerID = ptr(uint(w2.ID)) dbTask2.Status = api.TaskStatusQueued err = db.SaveTask(ctx, dbTask2) require.NoError(t, err) @@ -302,7 +302,7 @@ func TestAssignedToOtherWorker(t *testing.T) { require.NotNil(t, task) assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen") - assert.Equal(t, *task.WorkerID, w.ID, "the task should now be assigned to the worker it was scheduled for") + assert.Equal(t, int64(*task.WorkerID), w.ID, "the task should now be assigned to the worker it was scheduled for") } func TestPreviouslyFailed(t *testing.T) { @@ -344,14 +344,12 @@ func TestWorkerTagJobWithTag(t *testing.T) { require.NoError(t, db.CreateWorkerTag(ctx, &tag2)) // Create a worker in tag1: - workerC := linuxWorker(t, db, func(w *Worker) { - w.Tags = []*WorkerTag{&tag1} - }) + workerC := linuxWorker(t, db) + require.NoError(t, db.WorkerSetTags(ctx, &workerC, []string{tag1.UUID})) // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" - w.Tags = nil }) { // Test job with different tag: @@ -391,14 +389,12 @@ func TestWorkerTagJobWithoutTag(t *testing.T) { require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) // Create a worker in tag1: - workerC := linuxWorker(t, db, func(w *Worker) { - w.Tags = []*WorkerTag{&tag1} - }) + workerC := linuxWorker(t, db) + require.NoError(t, db.WorkerSetTags(ctx, &workerC, []string{tag1.UUID})) // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" - w.Tags = nil }) // Test tag-less job: @@ -537,9 +533,9 @@ func saveTestWorker(t *testing.T, db *DB, worker *Worker) { Address: worker.Address, Platform: worker.Platform, Software: worker.Software, - Status: string(worker.Status), - LastSeenAt: sql.NullTime{Time: worker.LastSeenAt, Valid: !worker.LastSeenAt.IsZero()}, - StatusRequested: string(worker.StatusRequested), + Status: worker.Status, + LastSeenAt: nullTimeToUTC(worker.LastSeenAt), + StatusRequested: worker.StatusRequested, LazyStatusRequest: worker.LazyStatusRequest, SupportedTaskTypes: worker.SupportedTaskTypes, DeletedAt: sql.NullTime(worker.DeletedAt), @@ -550,5 +546,6 @@ func saveTestWorker(t *testing.T, db *DB, worker *Worker) { id, err := queries.CreateWorker(context.TODO(), params) require.NoError(t, err, "cannot save worker %q", worker.Name) - worker.ID = uint(id) + worker.ID = id + worker.CreatedAt = params.CreatedAt } diff --git a/internal/manager/persistence/timeout.go b/internal/manager/persistence/timeout.go index b149b0cd..af74d4a0 100644 --- a/internal/manager/persistence/timeout.go +++ b/internal/manager/persistence/timeout.go @@ -31,7 +31,7 @@ func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) queries := db.queries() sqlcTasks, err := queries.FetchTimedOutTasks(ctx, sqlc.FetchTimedOutTasksParams{ - TaskStatus: string(api.TaskStatusActive), + TaskStatus: api.TaskStatusActive, UntouchedSince: sql.NullTime{Time: untouchedSince, Valid: true}, }) @@ -54,13 +54,8 @@ func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) func (db *DB) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*Worker, error) { queries := db.queries() - statuses := make([]string, len(workerStatusNoTimeout)) - for i, status := range workerStatusNoTimeout { - statuses[i] = string(status) - } - sqlcWorkers, err := queries.FetchTimedOutWorkers(ctx, sqlc.FetchTimedOutWorkersParams{ - WorkerStatusesNoTimeout: statuses, + WorkerStatusesNoTimeout: workerStatusNoTimeout, LastSeenBefore: sql.NullTime{ Time: lastSeenBefore.UTC(), Valid: true}, @@ -71,7 +66,7 @@ func (db *DB) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time result := make([]*Worker, len(sqlcWorkers)) for index := range sqlcWorkers { - result[index] = convertSqlcWorker(sqlcWorkers[index]) + result[index] = &sqlcWorkers[index] } return result, nil } diff --git a/internal/manager/persistence/timeout_test.go b/internal/manager/persistence/timeout_test.go index 30ad05c7..344dbc4f 100644 --- a/internal/manager/persistence/timeout_test.go +++ b/internal/manager/persistence/timeout_test.go @@ -1,6 +1,7 @@ package persistence import ( + "database/sql" "testing" "time" @@ -55,8 +56,8 @@ func TestFetchTimedOutWorkers(t *testing.T) { defer cancel() timeoutDeadline := mustParseTime("2022-06-07T11:14:47+02:00") - beforeDeadline := timeoutDeadline.Add(-10 * time.Second) - afterDeadline := timeoutDeadline.Add(10 * time.Second) + beforeDeadline := sql.NullTime{Time: timeoutDeadline.Add(-10 * time.Second), Valid: true} + afterDeadline := sql.NullTime{Time: timeoutDeadline.Add(10 * time.Second), Valid: true} worker0 := Worker{ // Offline, so should not time out. UUID: "c7b4d1d5-0a96-4e19-993f-028786d3d2c1", diff --git a/internal/manager/persistence/worker_sleep_schedule.go b/internal/manager/persistence/worker_sleep_schedule.go index 133fc470..fd361411 100644 --- a/internal/manager/persistence/worker_sleep_schedule.go +++ b/internal/manager/persistence/worker_sleep_schedule.go @@ -60,7 +60,7 @@ func (db *DB) SetWorkerSleepSchedule(ctx context.Context, workerUUID string, sch if err != nil { return fmt.Errorf("fetching worker %q: %w", workerUUID, err) } - schedule.WorkerID = worker.ID + schedule.WorkerID = uint(worker.ID) schedule.Worker = worker // Only store timestamps in UTC. @@ -120,7 +120,7 @@ func (db *DB) FetchSleepScheduleWorker(ctx context.Context, schedule *SleepSched return workerError(err, "finding worker by their sleep schedule") } - schedule.Worker = convertSqlcWorker(worker) + schedule.Worker = &worker return nil } diff --git a/internal/manager/persistence/worker_sleep_schedule_test.go b/internal/manager/persistence/worker_sleep_schedule_test.go index b529b4e2..d1bed97c 100644 --- a/internal/manager/persistence/worker_sleep_schedule_test.go +++ b/internal/manager/persistence/worker_sleep_schedule_test.go @@ -40,7 +40,7 @@ func TestFetchWorkerSleepSchedule(t *testing.T) { // Create a sleep schedule. created := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: true, @@ -53,7 +53,7 @@ func TestFetchWorkerSleepSchedule(t *testing.T) { fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, created, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), created, *fetched) } func TestFetchSleepScheduleWorker(t *testing.T) { @@ -74,7 +74,7 @@ func TestFetchSleepScheduleWorker(t *testing.T) { // Create a sleep schedule. created := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: true, @@ -120,7 +120,7 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) schedule := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: true, @@ -138,7 +138,7 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) fetched, err := db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, schedule, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), schedule, *fetched) // Overwrite the schedule with one that already has a database ID. newSchedule := schedule @@ -150,11 +150,11 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, newSchedule, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), newSchedule, *fetched) // Overwrite the schedule with a freshly constructed one. newerSchedule := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: true, @@ -166,11 +166,11 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, newerSchedule, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), newerSchedule, *fetched) // Clear the sleep schedule. emptySchedule := SleepSchedule{ - WorkerID: linuxWorker.ID, + WorkerID: uint(linuxWorker.ID), Worker: &linuxWorker, IsActive: false, @@ -182,7 +182,7 @@ func TestSetWorkerSleepSchedule(t *testing.T) { require.NoError(t, err) fetched, err = db.FetchWorkerSleepSchedule(ctx, linuxWorker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, linuxWorker.ID, emptySchedule, *fetched) + assertEqualSleepSchedule(t, uint(linuxWorker.ID), emptySchedule, *fetched) } @@ -212,7 +212,7 @@ func TestSetWorkerSleepScheduleNextCheck(t *testing.T) { fetched, err := db.FetchWorkerSleepSchedule(ctx, schedule.Worker.UUID) require.NoError(t, err) - assertEqualSleepSchedule(t, schedule.Worker.ID, schedule, *fetched) + assertEqualSleepSchedule(t, uint(schedule.Worker.ID), schedule, *fetched) } func TestFetchSleepSchedulesToCheck(t *testing.T) { @@ -293,9 +293,9 @@ func TestFetchSleepSchedulesToCheck(t *testing.T) { require.NoError(t, err) require.Len(t, toCheck, 2) - assertEqualSleepSchedule(t, schedule0.Worker.ID, schedule0, *toCheck[0]) + assertEqualSleepSchedule(t, uint(schedule0.Worker.ID), schedule0, *toCheck[0]) assert.Nil(t, toCheck[0].Worker, "the Worker should NOT be fetched") - assertEqualSleepSchedule(t, schedule2.Worker.ID, schedule1, *toCheck[1]) + assertEqualSleepSchedule(t, uint(schedule2.Worker.ID), schedule1, *toCheck[1]) assert.Nil(t, toCheck[1].Worker, "the Worker should NOT be fetched") } diff --git a/internal/manager/persistence/worker_tag.go b/internal/manager/persistence/worker_tag.go index 10dbacb4..aacddd0c 100644 --- a/internal/manager/persistence/worker_tag.go +++ b/internal/manager/persistence/worker_tag.go @@ -173,3 +173,17 @@ func (db *DB) WorkerSetTags(ctx context.Context, worker *Worker, tagUUIDs []stri return qtx.commit() } + +func (db *DB) FetchTagsOfWorker(ctx context.Context, workerUUID string) ([]WorkerTag, error) { + queries := db.queries() + tags, err := queries.FetchTagsOfWorker(ctx, workerUUID) + if err != nil { + return nil, workerTagError(err, "fetching tags of worker %s", workerUUID) + } + + gormTags := make([]WorkerTag, len(tags)) + for index, tag := range tags { + gormTags[index] = *convertSqlcWorkerTag(tag) + } + return gormTags, nil +} diff --git a/internal/manager/persistence/worker_tag_test.go b/internal/manager/persistence/worker_tag_test.go index 955da9cd..57fa2604 100644 --- a/internal/manager/persistence/worker_tag_test.go +++ b/internal/manager/persistence/worker_tag_test.go @@ -101,8 +101,11 @@ func TestAssignUnassignWorkerTags(t *testing.T) { w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) require.NoError(t, err) + tags, err := f.db.queries().FetchTagsOfWorker(f.ctx, w.UUID) + require.NoError(t, err) + // Catch doubly-reported tags, as the maps below would hide those cases. - assert.Len(t, w.Tags, len(tagUUIDs), msgLabel) + assert.Len(t, tags, len(tagUUIDs), msgLabel) expectTags := make(map[string]bool) for _, cid := range tagUUIDs { @@ -110,8 +113,8 @@ func TestAssignUnassignWorkerTags(t *testing.T) { } actualTags := make(map[string]bool) - for _, c := range w.Tags { - actualTags[c.UUID] = true + for _, tag := range tags { + actualTags[tag.UUID] = true } assert.Equal(t, expectTags, actualTags, msgLabel) @@ -171,9 +174,9 @@ func TestDeleteWorkerTagWithWorkersAssigned(t *testing.T) { require.NoError(t, f.db.DeleteWorkerTag(f.ctx, f.tag.UUID)) // Check the Worker has been unassigned from the tag. - w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) + tags, err := f.db.queries().FetchTagsOfWorker(f.ctx, f.worker.UUID) require.NoError(t, err) - assert.Empty(t, w.Tags) + assert.Empty(t, tags) } func assertTagsMatch(t *testing.T, f WorkerTestFixture, expectUUIDs ...string) { diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 93b76ec0..589c7d16 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -7,7 +7,6 @@ import ( "database/sql" "errors" "fmt" - "strings" "time" "github.com/rs/zerolog/log" @@ -15,7 +14,9 @@ import ( "projects.blender.org/studio/flamenco/pkg/api" ) -type Worker struct { +type Worker = sqlc.Worker + +type Worker__gorm struct { Model DeletedAt sql.NullTime @@ -38,76 +39,34 @@ type Worker struct { Tags []*WorkerTag } -func (w *Worker) Identifier() string { - // Avoid a panic when worker.Identifier() is called on a nil pointer. - if w == nil { - return "-nil worker-" - } - return fmt.Sprintf("%s (%s)", w.Name, w.UUID) -} - -// TaskTypes returns the worker's supported task types as list of strings. -func (w *Worker) TaskTypes() []string { - return strings.Split(w.SupportedTaskTypes, ",") -} - -// StatusChangeRequest stores a requested status change on the Worker. -// This just updates the Worker instance, but doesn't store the change in the -// database. -func (w *Worker) StatusChangeRequest(status api.WorkerStatus, isLazyRequest bool) { - w.StatusRequested = status - w.LazyStatusRequest = isLazyRequest -} - -// StatusChangeClear clears the requested status change of the Worker. -// This just updates the Worker instance, but doesn't store the change in the -// database. -func (w *Worker) StatusChangeClear() { - w.StatusRequested = "" - w.LazyStatusRequest = false -} - func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { queries := db.queries() now := db.nowNullable().Time - workerID, err := queries.CreateWorker(ctx, sqlc.CreateWorkerParams{ - CreatedAt: now, - UUID: w.UUID, - Secret: w.Secret, - Name: w.Name, - Address: w.Address, - Platform: w.Platform, - Software: w.Software, - Status: string(w.Status), - LastSeenAt: sql.NullTime{ - Time: w.LastSeenAt.UTC(), - Valid: !w.LastSeenAt.IsZero(), - }, - StatusRequested: string(w.StatusRequested), + params := sqlc.CreateWorkerParams{ + CreatedAt: now, + UUID: w.UUID, + Secret: w.Secret, + Name: w.Name, + Address: w.Address, + Platform: w.Platform, + Software: w.Software, + Status: w.Status, + LastSeenAt: nullTimeToUTC(w.LastSeenAt), + StatusRequested: w.StatusRequested, LazyStatusRequest: w.LazyStatusRequest, SupportedTaskTypes: w.SupportedTaskTypes, DeletedAt: sql.NullTime(w.DeletedAt), CanRestart: w.CanRestart, - }) + } + + workerID, err := queries.CreateWorker(ctx, params) if err != nil { return fmt.Errorf("creating new worker: %w", err) } - w.ID = uint(workerID) - w.CreatedAt = now - - // TODO: remove the create-with-tags functionality to a higher-level function. - // This code is just here to make this function work like the GORM code did. - for _, tag := range w.Tags { - err := queries.WorkerAddTagMembership(ctx, sqlc.WorkerAddTagMembershipParams{ - WorkerTagID: int64(tag.ID), - WorkerID: workerID, - }) - if err != nil { - return err - } - } + w.ID = workerID + w.CreatedAt = params.CreatedAt return nil } @@ -120,19 +79,7 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { return nil, workerError(err, "fetching worker %s", uuid) } - // TODO: remove this code, and let the caller fetch the tags when interested in them. - workerTags, err := queries.FetchTagsOfWorker(ctx, uuid) - if err != nil { - return nil, workerTagError(err, "fetching tags of worker %s", uuid) - } - - convertedWorker := convertSqlcWorker(worker) - convertedWorker.Tags = make([]*WorkerTag, len(workerTags)) - for index := range workerTags { - convertedWorker.Tags[index] = convertSqlcWorkerTag(workerTags[index]) - } - - return convertedWorker, nil + return &worker, nil } func (db *DB) DeleteWorker(ctx context.Context, uuid string) error { @@ -168,11 +115,11 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { return nil, workerError(err, "fetching all workers") } - gormWorkers := make([]*Worker, len(workers)) + workerPointers := make([]*Worker, len(workers)) for idx := range workers { - gormWorkers[idx] = convertSqlcWorker(workers[idx].Worker) + workerPointers[idx] = &workers[idx] } - return gormWorkers, nil + return workerPointers, nil } // FetchWorkerTask returns the most recent task assigned to the given Worker. @@ -184,8 +131,8 @@ func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error workerID := sql.NullInt64{Int64: int64(worker.ID), Valid: true} row, err := queries.FetchWorkerTask(ctx, sqlc.FetchWorkerTaskParams{ - TaskStatusActive: string(api.TaskStatusActive), - JobStatusActive: string(api.JobStatusActive), + TaskStatusActive: api.TaskStatusActive, + JobStatusActive: api.JobStatusActive, WorkerID: workerID, }) @@ -224,10 +171,10 @@ func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { err := queries.SaveWorkerStatus(ctx, sqlc.SaveWorkerStatusParams{ UpdatedAt: db.nowNullable(), - Status: string(w.Status), - StatusRequested: string(w.StatusRequested), + Status: w.Status, + StatusRequested: w.StatusRequested, LazyStatusRequest: w.LazyStatusRequest, - ID: int64(w.ID), + ID: w.ID, }) if err != nil { return fmt.Errorf("saving worker status: %w", err) @@ -235,10 +182,9 @@ func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error { return nil } -func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { - // TODO: remove this code, and just let the caller call CreateWorker() directly. +func (db *DB) SaveWorker(ctx context.Context, w *sqlc.Worker) error { if w.ID == 0 { - return db.CreateWorker(ctx, w) + panic("Do not use SaveWorker() to create a new Worker, use CreateWorker() instead") } queries := db.queries() @@ -251,13 +197,13 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { Address: w.Address, Platform: w.Platform, Software: w.Software, - Status: string(w.Status), - LastSeenAt: sql.NullTime{Time: w.LastSeenAt, Valid: !w.LastSeenAt.IsZero()}, - StatusRequested: string(w.StatusRequested), + Status: w.Status, + LastSeenAt: w.LastSeenAt, + StatusRequested: w.StatusRequested, LazyStatusRequest: w.LazyStatusRequest, SupportedTaskTypes: w.SupportedTaskTypes, CanRestart: w.CanRestart, - ID: int64(w.ID), + ID: w.ID, }) if err != nil { return fmt.Errorf("saving worker: %w", err) @@ -303,34 +249,6 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e return statusCounts, nil } -// convertSqlcWorker converts a worker from the SQLC-generated model to the model -// expected by the rest of the code. This is mostly in place to aid in the GORM -// to SQLC migration. It is intended that eventually the rest of the code will -// use the same SQLC-generated model. -func convertSqlcWorker(worker sqlc.Worker) *Worker { - return &Worker{ - Model: Model{ - ID: uint(worker.ID), - CreatedAt: worker.CreatedAt, - UpdatedAt: worker.UpdatedAt.Time, - }, - DeletedAt: worker.DeletedAt, - - UUID: worker.UUID, - Secret: worker.Secret, - Name: worker.Name, - Address: worker.Address, - Platform: worker.Platform, - Software: worker.Software, - Status: api.WorkerStatus(worker.Status), - LastSeenAt: worker.LastSeenAt.Time, - CanRestart: worker.CanRestart, - StatusRequested: api.WorkerStatus(worker.StatusRequested), - LazyStatusRequest: worker.LazyStatusRequest, - SupportedTaskTypes: worker.SupportedTaskTypes, - } -} - // convertSqlcWorkerTag converts a worker tag from the SQLC-generated model to // the model expected by the rest of the code. This is mostly in place to aid in // the GORM to SQLC migration. It is intended that eventually the rest of the diff --git a/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go b/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go index 274338e8..1d428fbb 100644 --- a/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go +++ b/internal/manager/sleep_scheduler/mocks/interfaces_mock.gen.go @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" + sqlc "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" api "projects.blender.org/studio/flamenco/pkg/api" ) @@ -81,7 +82,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorkerSleepSchedule(arg0, arg } // SaveWorkerStatus mocks base method. -func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveWorkerStatus", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/internal/manager/sleep_scheduler/sleep_scheduler.go b/internal/manager/sleep_scheduler/sleep_scheduler.go index fc3f2779..613cf1a7 100644 --- a/internal/manager/sleep_scheduler/sleep_scheduler.go +++ b/internal/manager/sleep_scheduler/sleep_scheduler.go @@ -178,7 +178,7 @@ func (ss *SleepScheduler) updateWorkerStatus( IsLazy: false, Status: worker.StatusRequested, }, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) diff --git a/internal/manager/sleep_scheduler/sleep_scheduler_test.go b/internal/manager/sleep_scheduler/sleep_scheduler_test.go index e02d1d94..5fa32491 100644 --- a/internal/manager/sleep_scheduler/sleep_scheduler_test.go +++ b/internal/manager/sleep_scheduler/sleep_scheduler_test.go @@ -118,7 +118,7 @@ func TestApplySleepSchedule(t *testing.T) { ss, mocks, ctx := testFixtures(t) worker := persistence.Worker{ - Model: persistence.Model{ID: 5}, + ID: 5, UUID: "74997de4-c530-4913-b89f-c489f14f7634", Status: api.WorkerStatusOffline, } @@ -192,7 +192,7 @@ func TestApplySleepScheduleNoStatusChange(t *testing.T) { ss, mocks, ctx := testFixtures(t) worker := persistence.Worker{ - Model: persistence.Model{ID: 5}, + ID: 5, UUID: "74997de4-c530-4913-b89f-c489f14f7634", Status: api.WorkerStatusAsleep, } diff --git a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go index 1a084f17..ef004fba 100644 --- a/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go +++ b/internal/manager/task_state_machine/mocks/interfaces_mock.gen.go @@ -11,6 +11,7 @@ import ( gomock "github.com/golang/mock/gomock" zerolog "github.com/rs/zerolog" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" + sqlc "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" api "projects.blender.org/studio/flamenco/pkg/api" ) @@ -79,7 +80,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchJobsInStatus(arg0 interface{} } // FetchTasksOfWorkerInStatus mocks base method. -func (m *MockPersistenceService) FetchTasksOfWorkerInStatus(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus) ([]*persistence.Task, error) { +func (m *MockPersistenceService) FetchTasksOfWorkerInStatus(arg0 context.Context, arg1 *sqlc.Worker, arg2 api.TaskStatus) ([]*persistence.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatus", arg0, arg1, arg2) ret0, _ := ret[0].([]*persistence.Task) @@ -94,7 +95,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTasksOfWorkerInStatus(arg0, a } // FetchTasksOfWorkerInStatusOfJob mocks base method. -func (m *MockPersistenceService) FetchTasksOfWorkerInStatusOfJob(arg0 context.Context, arg1 *persistence.Worker, arg2 api.TaskStatus, arg3 *persistence.Job) ([]*persistence.Task, error) { +func (m *MockPersistenceService) FetchTasksOfWorkerInStatusOfJob(arg0 context.Context, arg1 *sqlc.Worker, arg2 api.TaskStatus, arg3 *persistence.Job) ([]*persistence.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchTasksOfWorkerInStatusOfJob", arg0, arg1, arg2, arg3) ret0, _ := ret[0].([]*persistence.Task) diff --git a/internal/manager/timeout_checker/interfaces.go b/internal/manager/timeout_checker/interfaces.go index bff1eb90..dd51fb4f 100644 --- a/internal/manager/timeout_checker/interfaces.go +++ b/internal/manager/timeout_checker/interfaces.go @@ -9,6 +9,7 @@ import ( "github.com/rs/zerolog" "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -19,7 +20,7 @@ import ( type PersistenceService interface { FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*persistence.Worker, error) - SaveWorker(ctx context.Context, w *persistence.Worker) error + SaveWorker(ctx context.Context, w *sqlc.Worker) error } var _ PersistenceService = (*persistence.DB)(nil) diff --git a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go index af14b46c..509af5bd 100644 --- a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go +++ b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go @@ -12,6 +12,7 @@ import ( gomock "github.com/golang/mock/gomock" zerolog "github.com/rs/zerolog" persistence "projects.blender.org/studio/flamenco/internal/manager/persistence" + sqlc "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" api "projects.blender.org/studio/flamenco/pkg/api" ) @@ -54,10 +55,10 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutTasks(arg0, arg1 inte } // FetchTimedOutWorkers mocks base method. -func (m *MockPersistenceService) FetchTimedOutWorkers(arg0 context.Context, arg1 time.Time) ([]*persistence.Worker, error) { +func (m *MockPersistenceService) FetchTimedOutWorkers(arg0 context.Context, arg1 time.Time) ([]*sqlc.Worker, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchTimedOutWorkers", arg0, arg1) - ret0, _ := ret[0].([]*persistence.Worker) + ret0, _ := ret[0].([]*sqlc.Worker) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -69,7 +70,7 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutWorkers(arg0, arg1 in } // SaveWorker mocks base method. -func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *persistence.Worker) error { +func (m *MockPersistenceService) SaveWorker(arg0 context.Context, arg1 *sqlc.Worker) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SaveWorker", arg0, arg1) ret0, _ := ret[0].(error) @@ -106,7 +107,7 @@ func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder { } // RequeueActiveTasksOfWorker mocks base method. -func (m *MockTaskStateMachine) RequeueActiveTasksOfWorker(arg0 context.Context, arg1 *persistence.Worker, arg2 string) error { +func (m *MockTaskStateMachine) RequeueActiveTasksOfWorker(arg0 context.Context, arg1 *sqlc.Worker, arg2 string) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RequeueActiveTasksOfWorker", arg0, arg1, arg2) ret0, _ := ret[0].(error) diff --git a/internal/manager/timeout_checker/tasks_test.go b/internal/manager/timeout_checker/tasks_test.go index 63291ee5..0c4747c8 100644 --- a/internal/manager/timeout_checker/tasks_test.go +++ b/internal/manager/timeout_checker/tasks_test.go @@ -111,9 +111,9 @@ func TestTaskTimeout(t *testing.T) { job := persistence.Job{UUID: "JOB-UUID"} worker := persistence.Worker{ - UUID: "WORKER-UUID", - Name: "Tester", - Model: persistence.Model{ID: 47}, + UUID: "WORKER-UUID", + Name: "Tester", + ID: 47, } taskUnassigned := persistence.Task{ UUID: "TASK-UUID-UNASSIGNED", @@ -124,13 +124,13 @@ func TestTaskTimeout(t *testing.T) { UUID: "TASK-UUID-UNKNOWN", Job: &job, LastTouchedAt: lastTime, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), } taskAssigned := persistence.Task{ UUID: "TASK-UUID-ASSIGNED", Job: &job, LastTouchedAt: lastTime, - WorkerID: &worker.ID, + WorkerID: ptr(uint(worker.ID)), Worker: &worker, } diff --git a/internal/manager/timeout_checker/workers.go b/internal/manager/timeout_checker/workers.go index 918e4604..d3b32afe 100644 --- a/internal/manager/timeout_checker/workers.go +++ b/internal/manager/timeout_checker/workers.go @@ -6,7 +6,7 @@ import ( "context" "github.com/rs/zerolog/log" - "projects.blender.org/studio/flamenco/internal/manager/persistence" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -37,11 +37,11 @@ func (ttc *TimeoutChecker) checkWorkers(ctx context.Context) { } // timeoutTask marks a task as 'failed' due to a timeout. -func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistence.Worker) { +func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *sqlc.Worker) { logger := log.With(). Str("worker", worker.UUID). Str("name", worker.Name). - Str("lastSeenAt", worker.LastSeenAt.String()). + Str("lastSeenAt", worker.LastSeenAt.Time.String()). Logger() logger.Warn().Msg("TimeoutChecker: worker timed out") @@ -63,9 +63,13 @@ func (ttc *TimeoutChecker) timeoutWorker(ctx context.Context, worker *persistenc ttc.broadcaster.BroadcastWorkerUpdate(api.EventWorkerUpdate{ Id: worker.UUID, Name: worker.Name, - PreviousStatus: &prevStatus, + PreviousStatus: ptr(api.WorkerStatus(prevStatus)), Status: api.WorkerStatusError, - Updated: worker.UpdatedAt, + Updated: worker.UpdatedAt.Time, Version: worker.Software, }) } + +func ptr[T any](value T) *T { + return &value +} diff --git a/internal/manager/timeout_checker/workers_test.go b/internal/manager/timeout_checker/workers_test.go index b2202c0e..79f43512 100644 --- a/internal/manager/timeout_checker/workers_test.go +++ b/internal/manager/timeout_checker/workers_test.go @@ -3,6 +3,7 @@ package timeout_checker // SPDX-License-Identifier: GPL-3.0-or-later import ( + "database/sql" "testing" "time" @@ -30,8 +31,8 @@ func TestWorkerTimeout(t *testing.T) { worker := persistence.Worker{ UUID: "WORKER-UUID", Name: "Tester", - Model: persistence.Model{ID: 47}, - LastSeenAt: lastSeenAt, + ID: 47, + LastSeenAt: sql.NullTime{Time: lastSeenAt, Valid: true}, Status: api.WorkerStatusAsleep, StatusRequested: api.WorkerStatusAwake, } @@ -58,7 +59,7 @@ func TestWorkerTimeout(t *testing.T) { Name: worker.Name, PreviousStatus: &prevStatus, Status: api.WorkerStatusError, - Updated: persistedWorker.UpdatedAt, + Updated: persistedWorker.UpdatedAt.Time, Version: persistedWorker.Software, }) diff --git a/sqlc.yaml b/sqlc.yaml index 9025df8b..332ed912 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -11,6 +11,14 @@ sql: go_type: import: "encoding/json" type: "RawMessage" + - column: jobs.status + go_type: { type: "JobStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: tasks.status + go_type: { type: "TaskStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status_requested + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } rename: uuid: "UUID" uuids: "UUIDs" @@ -28,6 +36,14 @@ sql: go_type: import: "encoding/json" type: "RawMessage" + - column: jobs.status + go_type: { type: "JobStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: tasks.status + go_type: { type: "TaskStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status_requested + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } rename: uuid: "UUID" uuids: "UUIDs" @@ -45,6 +61,14 @@ sql: go_type: import: "encoding/json" type: "RawMessage" + - column: jobs.status + go_type: { type: "JobStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: tasks.status + go_type: { type: "TaskStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status_requested + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } rename: uuid: "UUID" uuids: "UUIDs" @@ -62,6 +86,14 @@ sql: go_type: import: "encoding/json" type: "RawMessage" + - column: jobs.status + go_type: { type: "JobStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: tasks.status + go_type: { type: "TaskStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } + - column: workers.status_requested + go_type: { type: "WorkerStatus", import: "projects.blender.org/studio/flamenco/pkg/api" } rename: uuid: "UUID" uuids: "UUIDs"