From c1cdff567e282a1be07a8ca181fcef0dd1a396ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Wed, 13 Mar 2024 15:03:19 +0100 Subject: [PATCH] Manager: Convert FetchTask to sqlc This is a bit more work than other queries, as it also breaks apart the fetching of the job and the worker into separate ones. In other words, internally the persistence layer API changes. --- internal/manager/api_impl/interfaces.go | 2 + internal/manager/api_impl/jobs.go | 22 ++-- internal/manager/api_impl/jobs_test.go | 32 +---- .../api_impl/mocks/api_impl_mock.gen.go | 15 +++ internal/manager/persistence/db.go | 4 +- internal/manager/persistence/errors.go | 38 ++++-- internal/manager/persistence/jobs.go | 112 ++++++++++++++++-- .../persistence/jobs_blocklist_test.go | 9 +- internal/manager/persistence/jobs_test.go | 13 ++ internal/manager/persistence/logger.go | 28 +++++ .../manager/persistence/sqlc/query_jobs.sql | 19 +++ .../persistence/sqlc/query_jobs.sql.go | 80 +++++++++++++ .../persistence/sqlc/query_workers.sql | 18 +++ .../persistence/sqlc/query_workers.sql.go | 109 +++++++++++++++++ internal/manager/persistence/test_support.go | 5 +- internal/manager/persistence/workers.go | 78 ++++++++++-- sqlc.yaml | 20 ++++ 17 files changed, 527 insertions(+), 77 deletions(-) create mode 100644 internal/manager/persistence/sqlc/query_workers.sql create mode 100644 internal/manager/persistence/sqlc/query_workers.sql.go diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index ea9bef0f..127c1a02 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -36,6 +36,8 @@ type PersistenceService interface { SaveJobPriority(ctx context.Context, job *persistence.Job) error // FetchTask fetches the given task and the accompanying job. FetchTask(ctx context.Context, taskID string) (*persistence.Task, error) + // FetchTaskJobUUID fetches the UUID of the job this task belongs to. + FetchTaskJobUUID(ctx context.Context, taskID string) (string, error) FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error) SaveTaskActivity(ctx context.Context, t *persistence.Task) error // TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection. diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 9e9918b2..92d2aad9 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -439,7 +439,7 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error { return sendAPIError(e, http.StatusBadRequest, "bad task ID") } - dbTask, err := f.persist.FetchTask(ctx, taskID) + jobUUID, err := f.persist.FetchTaskJobUUID(ctx, taskID) if err != nil { if errors.Is(err, persistence.ErrTaskNotFound) { return sendAPIError(e, http.StatusNotFound, "no such task") @@ -447,9 +447,9 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error { logger.Error().Err(err).Msg("error fetching task") return sendAPIError(e, http.StatusInternalServerError, "error fetching task: %v", err) } - logger = logger.With().Str("job", dbTask.Job.UUID).Logger() + logger = logger.With().Str("job", jobUUID).Logger() - size, err := f.logStorage.TaskLogSize(dbTask.Job.UUID, taskID) + size, err := f.logStorage.TaskLogSize(jobUUID, taskID) if err != nil { if errors.Is(err, os.ErrNotExist) { logger.Debug().Msg("task log unavailable, task has no log on disk") @@ -475,11 +475,11 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error { taskLogInfo := api.TaskLogInfo{ TaskId: taskID, - JobId: dbTask.Job.UUID, + JobId: jobUUID, Size: int(size), } - fullLogPath := f.logStorage.Filepath(dbTask.Job.UUID, taskID) + fullLogPath := f.logStorage.Filepath(jobUUID, taskID) relPath, err := f.localStorage.RelPath(fullLogPath) if err != nil { logger.Error().Err(err).Msg("task log is outside the manager storage, cannot construct its URL for download") @@ -501,7 +501,7 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error { return sendAPIError(e, http.StatusBadRequest, "bad task ID") } - dbTask, err := f.persist.FetchTask(ctx, taskID) + jobUUID, err := f.persist.FetchTaskJobUUID(ctx, taskID) if err != nil { if errors.Is(err, persistence.ErrTaskNotFound) { return sendAPIError(e, http.StatusNotFound, "no such task") @@ -509,9 +509,9 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error { logger.Error().Err(err).Msg("error fetching task") return sendAPIError(e, http.StatusInternalServerError, "error fetching task: %v", err) } - logger = logger.With().Str("job", dbTask.Job.UUID).Logger() + logger = logger.With().Str("job", jobUUID).Logger() - tail, err := f.logStorage.Tail(dbTask.Job.UUID, taskID) + tail, err := f.logStorage.Tail(jobUUID, taskID) if err != nil { if errors.Is(err, os.ErrNotExist) { logger.Debug().Msg("task tail unavailable, task has no log on disk") @@ -700,7 +700,11 @@ func taskDBtoAPI(dbTask *persistence.Task) api.Task { Status: dbTask.Status, Activity: dbTask.Activity, Commands: make([]api.Command, len(dbTask.Commands)), - Worker: workerToTaskWorker(dbTask.Worker), + + // TODO: convert this to just store dbTask.WorkerUUID. + Worker: workerToTaskWorker(dbTask.Worker), + + JobId: dbTask.JobUUID, } if dbTask.Job != nil { diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 61b52327..b5134cab 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -753,22 +753,10 @@ func TestFetchTaskLogTail(t *testing.T) { jobID := "18a9b096-d77e-438c-9be2-74397038298b" taskID := "2e020eee-20f8-4e95-8dcf-65f7dfc3ebab" - dbJob := persistence.Job{ - UUID: jobID, - Name: "test job", - Status: api.JobStatusActive, - Settings: persistence.StringInterfaceMap{}, - Metadata: persistence.StringStringMap{}, - } - dbTask := persistence.Task{ - UUID: taskID, - Job: &dbJob, - Name: "test task", - } // The task can be found, but has no on-disk task log. // This should not cause any error, but instead be returned as "no content". - mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&dbTask, nil) + mf.persistence.EXPECT().FetchTaskJobUUID(gomock.Any(), taskID).Return(jobID, nil) mf.logStorage.EXPECT().Tail(jobID, taskID). Return("", fmt.Errorf("wrapped error: %w", os.ErrNotExist)) @@ -778,7 +766,7 @@ func TestFetchTaskLogTail(t *testing.T) { assertResponseNoContent(t, echoCtx) // Check that a 204 No Content is also returned when the task log file on disk exists, but is empty. - mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&dbTask, nil) + mf.persistence.EXPECT().FetchTaskJobUUID(gomock.Any(), taskID).Return(jobID, nil) mf.logStorage.EXPECT().Tail(jobID, taskID). Return("", fmt.Errorf("wrapped error: %w", os.ErrNotExist)) @@ -796,21 +784,9 @@ func TestFetchTaskLogInfo(t *testing.T) { jobID := "18a9b096-d77e-438c-9be2-74397038298b" taskID := "2e020eee-20f8-4e95-8dcf-65f7dfc3ebab" - dbJob := persistence.Job{ - UUID: jobID, - Name: "test job", - Status: api.JobStatusActive, - Settings: persistence.StringInterfaceMap{}, - Metadata: persistence.StringStringMap{}, - } - dbTask := persistence.Task{ - UUID: taskID, - Job: &dbJob, - Name: "test task", - } mf.persistence.EXPECT(). - FetchTask(gomock.Any(), taskID). - Return(&dbTask, nil). + FetchTaskJobUUID(gomock.Any(), taskID). + Return(jobID, nil). AnyTimes() // The task can be found, but has no on-disk task log. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 9f63ca2d..03bb6c96 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -244,6 +244,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTaskFailureList(arg0, arg1 in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTaskFailureList", reflect.TypeOf((*MockPersistenceService)(nil).FetchTaskFailureList), arg0, arg1) } +// FetchTaskJobUUID mocks base method. +func (m *MockPersistenceService) FetchTaskJobUUID(arg0 context.Context, arg1 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchTaskJobUUID", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchTaskJobUUID indicates an expected call of FetchTaskJobUUID. +func (mr *MockPersistenceServiceMockRecorder) FetchTaskJobUUID(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTaskJobUUID", reflect.TypeOf((*MockPersistenceService)(nil).FetchTaskJobUUID), arg0, arg1) +} + // FetchWorker mocks base method. func (m *MockPersistenceService) FetchWorker(arg0 context.Context, arg1 string) (*persistence.Worker, error) { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index 1ffe2806..388a4013 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -184,7 +184,9 @@ func (db *DB) queries() (*sqlc.Queries, error) { if err != nil { return nil, fmt.Errorf("could not get low-level database driver: %w", err) } - return sqlc.New(sqldb), nil + + loggingWrapper := LoggingDBConn{sqldb} + return sqlc.New(&loggingWrapper), nil } // now returns the result of `nowFunc()` wrapped in a sql.NullTime. diff --git a/internal/manager/persistence/errors.go b/internal/manager/persistence/errors.go index 24eb3dae..816a1383 100644 --- a/internal/manager/persistence/errors.go +++ b/internal/manager/persistence/errors.go @@ -2,6 +2,7 @@ package persistence import ( + "database/sql" "errors" "fmt" @@ -9,6 +10,7 @@ import ( ) var ( + // TODO: let these errors wrap database/sql.ErrNoRows. ErrJobNotFound = PersistenceError{Message: "job not found", Err: gorm.ErrRecordNotFound} ErrTaskNotFound = PersistenceError{Message: "task not found", Err: gorm.ErrRecordNotFound} ErrWorkerNotFound = PersistenceError{Message: "worker not found", Err: gorm.ErrRecordNotFound} @@ -63,36 +65,48 @@ func wrapError(errorToWrap error, message string, format ...interface{}) error { // translateGormJobError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormJobError(gormError error) error { - if errors.Is(gormError, gorm.ErrRecordNotFound) { +func translateGormJobError(err error) error { + if errors.Is(err, sql.ErrNoRows) { + return ErrTaskNotFound + } + if errors.Is(err, gorm.ErrRecordNotFound) { return ErrJobNotFound } - return gormError + return err } // translateGormTaskError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormTaskError(gormError error) error { - if errors.Is(gormError, gorm.ErrRecordNotFound) { +func translateGormTaskError(err error) error { + if errors.Is(err, sql.ErrNoRows) { return ErrTaskNotFound } - return gormError + if errors.Is(err, gorm.ErrRecordNotFound) { + return ErrTaskNotFound + } + return err } // translateGormWorkerError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerError(gormError error) error { - if errors.Is(gormError, gorm.ErrRecordNotFound) { +func translateGormWorkerError(err error) error { + if errors.Is(err, sql.ErrNoRows) { return ErrWorkerNotFound } - return gormError + if errors.Is(err, gorm.ErrRecordNotFound) { + return ErrWorkerNotFound + } + return err } // translateGormWorkerTagError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerTagError(gormError error) error { - if errors.Is(gormError, gorm.ErrRecordNotFound) { +func translateGormWorkerTagError(err error) error { + if errors.Is(err, sql.ErrNoRows) { return ErrWorkerTagNotFound } - return gormError + if errors.Is(err, gorm.ErrRecordNotFound) { + return ErrWorkerTagNotFound + } + return err } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 8a2f94d3..f28f0802 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -66,12 +66,14 @@ type Task struct { Type string `gorm:"type:varchar(32);default:''"` JobID uint `gorm:"default:0"` Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"` + JobUUID string `gorm:"-"` // Fetched by SQLC, not GORM. Priority int `gorm:"type:smallint;default:50"` Status api.TaskStatus `gorm:"type:varchar(16);default:''"` // Which worker is/was working on this. WorkerID *uint Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:SET NULL"` + WorkerUUID string `gorm:"-"` // Fetched by SQLC, not GORM. LastTouchedAt time.Time `gorm:"index"` // Should contain UTC timestamps. // Dependencies are tasks that need to be completed before this one can run. @@ -454,18 +456,66 @@ func (db *DB) SaveJobStorageInfo(ctx context.Context, j *Job) error { } func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) { - dbTask := Task{} - tx := db.gormDB.WithContext(ctx). - // Allow finding the Worker, even after it was deleted. Jobs and Tasks - // don't have soft-deletion. - Unscoped(). - Joins("Job"). - Joins("Worker"). - First(&dbTask, "tasks.uuid = ?", taskUUID) - if tx.Error != nil { - return nil, taskError(tx.Error, "fetching task") + queries, err := db.queries() + if err != nil { + return nil, err } - return &dbTask, nil + + taskRow, err := queries.FetchTask(ctx, taskUUID) + if err != nil { + return nil, taskError(err, "fetching task %s", taskUUID) + } + + convertedTask, err := convertSqlcTask(taskRow) + if err != nil { + return nil, err + } + + // TODO: remove this code, and let the caller fetch the job explicitly when needed. + if taskRow.Task.JobID > 0 { + dbJob, err := queries.FetchJobByID(ctx, taskRow.Task.JobID) + if err != nil { + return nil, jobError(err, "fetching job of task %s", taskUUID) + } + + convertedJob, err := convertSqlcJob(dbJob) + if err != nil { + return nil, jobError(err, "converting job of task %s", taskUUID) + } + convertedTask.Job = convertedJob + if convertedTask.JobUUID != convertedJob.UUID { + panic("Conversion to SQLC is incomplete") + } + } + + // TODO: remove this code, and let the caller fetch the Worker explicitly when needed. + if taskRow.WorkerUUID.Valid { + worker, err := queries.FetchWorkerUnconditional(ctx, taskRow.WorkerUUID.String) + if err != nil { + return nil, taskError(err, "fetching worker assigned to task %s", taskUUID) + } + convertedWorker := convertSqlcWorker(worker) + convertedTask.Worker = &convertedWorker + } + + return convertedTask, nil +} + +// FetchTaskJobUUID fetches the job UUID of the given task. +func (db *DB) FetchTaskJobUUID(ctx context.Context, taskUUID string) (string, error) { + queries, err := db.queries() + if err != nil { + return "", err + } + + jobUUID, err := queries.FetchTaskJobUUID(ctx, taskUUID) + if err != nil { + return "", taskError(err, "fetching job UUID of task %s", taskUUID) + } + if !jobUUID.Valid { + return "", PersistenceError{Message: fmt.Sprintf("unable to find job of task %s", taskUUID)} + } + return jobUUID.String, nil } func (db *DB) SaveTask(ctx context.Context, t *Task) error { @@ -791,3 +841,43 @@ func convertSqlcJob(job sqlc.Job) (*Job, error) { return &dbJob, nil } + +// convertSqlcTask converts a FetchTaskRow from the SQLC-generated model to the +// model expected by the rest of the code. This is mostly in place to aid in the +// GORM to SQLC migration. It is intended that eventually the rest of the code +// will use the same SQLC-generated model. +func convertSqlcTask(taskRow sqlc.FetchTaskRow) (*Task, error) { + dbTask := Task{ + Model: Model{ + ID: uint(taskRow.Task.ID), + CreatedAt: taskRow.Task.CreatedAt, + UpdatedAt: taskRow.Task.UpdatedAt.Time, + }, + + UUID: taskRow.Task.UUID, + Name: taskRow.Task.Name, + Type: taskRow.Task.Type, + Priority: int(taskRow.Task.Priority), + Status: api.TaskStatus(taskRow.Task.Status), + LastTouchedAt: taskRow.Task.LastTouchedAt.Time, + Activity: taskRow.Task.Activity, + + JobID: uint(taskRow.Task.JobID), + JobUUID: taskRow.JobUUID.String, + WorkerUUID: taskRow.WorkerUUID.String, + } + + // TODO: convert dependencies? + + if taskRow.Task.WorkerID.Valid { + workerID := uint(taskRow.Task.WorkerID.Int64) + dbTask.WorkerID = &workerID + } + + if err := json.Unmarshal(taskRow.Task.Commands, &dbTask.Commands); err != nil { + return nil, taskError(err, fmt.Sprintf("task %s of job %s has invalid commands: %v", + taskRow.Task.UUID, taskRow.JobUUID.String, err)) + } + + return &dbTask, nil +} diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go index 436c5f18..cd264f7f 100644 --- a/internal/manager/persistence/jobs_blocklist_test.go +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -238,9 +238,12 @@ func TestCountTaskFailuresOfWorker(t *testing.T) { ctx, close, db, dbJob, authoredJob := jobTasksTestFixtures(t) defer close() - task0, _ := db.FetchTask(ctx, authoredJob.Tasks[0].UUID) - task1, _ := db.FetchTask(ctx, authoredJob.Tasks[1].UUID) - task2, _ := db.FetchTask(ctx, authoredJob.Tasks[2].UUID) + task0, err := db.FetchTask(ctx, authoredJob.Tasks[0].UUID) + require.NoError(t, err) + task1, err := db.FetchTask(ctx, authoredJob.Tasks[1].UUID) + require.NoError(t, err) + task2, err := db.FetchTask(ctx, authoredJob.Tasks[2].UUID) + require.NoError(t, err) // Sanity check on the test data. assert.Equal(t, "blender", task0.Type) diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 000a6db4..20a0b1d5 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -75,6 +75,19 @@ func TestStoreAuthoredJobWithShamanCheckoutID(t *testing.T) { assert.Equal(t, job.Storage.ShamanCheckoutID, fetchedJob.Storage.ShamanCheckoutID) } +func TestFetchTaskJobUUID(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) + defer cancel() + + job := createTestAuthoredJobWithTasks() + err := db.StoreAuthoredJob(ctx, job) + require.NoError(t, err) + + jobUUID, err := db.FetchTaskJobUUID(ctx, job.Tasks[0].UUID) + require.NoError(t, err) + assert.Equal(t, job.JobID, jobUUID) +} + func TestSaveJobStorageInfo(t *testing.T) { // Test that saving job storage info doesn't count as "update". // This is necessary for `cmd/shaman-checkout-id-setter` to do its work quietly. diff --git a/internal/manager/persistence/logger.go b/internal/manager/persistence/logger.go index 2135a006..3e346ef8 100644 --- a/internal/manager/persistence/logger.go +++ b/internal/manager/persistence/logger.go @@ -4,13 +4,16 @@ package persistence import ( "context" + "database/sql" "errors" "fmt" "time" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "gorm.io/gorm" gormlogger "gorm.io/gorm/logger" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" ) // dbLogger implements the behaviour of Gorm's default logger on top of Zerolog. @@ -126,3 +129,28 @@ func (l dbLogger) logger(args ...interface{}) zerolog.Logger { } return logCtx.Logger() } + +// LoggingDBConn wraps a database/sql.DB connection, so that it can be used with +// sqlc and log all the queries. +type LoggingDBConn struct { + wrappedConn sqlc.DBTX +} + +var _ sqlc.DBTX = (*LoggingDBConn)(nil) + +func (ldbc *LoggingDBConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) { + log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query Exec") + return ldbc.wrappedConn.ExecContext(ctx, sql, args...) +} +func (ldbc *LoggingDBConn) PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error) { + log.Trace().Str("sql", sql).Msg("database: query Prepare") + return ldbc.wrappedConn.PrepareContext(ctx, sql) +} +func (ldbc *LoggingDBConn) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { + log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query Query") + return ldbc.wrappedConn.QueryContext(ctx, sql, args...) +} +func (ldbc *LoggingDBConn) QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row { + log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query QueryRow") + return ldbc.wrappedConn.QueryRowContext(ctx, sql, args...) +} diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index 0f606454..e3108647 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -18,9 +18,15 @@ INSERT INTO jobs ( VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ); -- name: FetchJob :one +-- Fetch a job by its UUID. SELECT * FROM jobs WHERE uuid = ? LIMIT 1; +-- name: FetchJobByID :one +-- Fetch a job by its numerical ID. +SELECT * FROM jobs +WHERE id = ? LIMIT 1; + -- name: DeleteJob :exec DELETE FROM jobs WHERE uuid = ?; @@ -55,3 +61,16 @@ UPDATE jobs SET updated_at=@now, priority=@priority WHERE id=@id; -- name: SaveJobStorageInfo :exec UPDATE jobs SET storage_shaman_checkout_id=@storage_shaman_checkout_id WHERE id=@id; + +-- name: FetchTask :one +SELECT sqlc.embed(tasks), jobs.UUID as jobUUID, workers.UUID as workerUUID +FROM tasks +LEFT JOIN jobs ON (tasks.job_id = jobs.id) +LEFT JOIN workers ON (tasks.worker_id = workers.id) +WHERE tasks.uuid = @uuid; + +-- name: FetchTaskJobUUID :one +SELECT jobs.UUID as jobUUID +FROM tasks +LEFT JOIN jobs ON (tasks.job_id = jobs.id) +WHERE tasks.uuid = @uuid; diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 26032839..6d1d70f4 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -74,6 +74,7 @@ SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activ WHERE uuid = ? LIMIT 1 ` +// Fetch a job by its UUID. func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) { row := q.db.QueryRowContext(ctx, fetchJob, uuid) var i Job @@ -96,6 +97,34 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) { return i, err } +const fetchJobByID = `-- name: FetchJobByID :one +SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs +WHERE id = ? LIMIT 1 +` + +// Fetch a job by its numerical ID. +func (q *Queries) FetchJobByID(ctx context.Context, id int64) (Job, error) { + row := q.db.QueryRowContext(ctx, fetchJobByID, id) + var i Job + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Name, + &i.JobType, + &i.Priority, + &i.Status, + &i.Activity, + &i.Settings, + &i.Metadata, + &i.DeleteRequestedAt, + &i.StorageShamanCheckoutID, + &i.WorkerTagID, + ) + return i, err +} + const fetchJobUUIDsUpdatedBefore = `-- name: FetchJobUUIDsUpdatedBefore :many SELECT uuid FROM jobs WHERE updated_at <= ?1 ` @@ -204,6 +233,57 @@ func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]J return items, nil } +const fetchTask = `-- name: FetchTask :one +SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID, workers.UUID as workerUUID +FROM tasks +LEFT JOIN jobs ON (tasks.job_id = jobs.id) +LEFT JOIN workers ON (tasks.worker_id = workers.id) +WHERE tasks.uuid = ?1 +` + +type FetchTaskRow struct { + Task Task + JobUUID sql.NullString + WorkerUUID sql.NullString +} + +func (q *Queries) FetchTask(ctx context.Context, uuid string) (FetchTaskRow, error) { + row := q.db.QueryRowContext(ctx, fetchTask, uuid) + var i FetchTaskRow + err := row.Scan( + &i.Task.ID, + &i.Task.CreatedAt, + &i.Task.UpdatedAt, + &i.Task.UUID, + &i.Task.Name, + &i.Task.Type, + &i.Task.JobID, + &i.Task.Priority, + &i.Task.Status, + &i.Task.WorkerID, + &i.Task.LastTouchedAt, + &i.Task.Commands, + &i.Task.Activity, + &i.JobUUID, + &i.WorkerUUID, + ) + return i, err +} + +const fetchTaskJobUUID = `-- name: FetchTaskJobUUID :one +SELECT jobs.UUID as jobUUID +FROM tasks +LEFT JOIN jobs ON (tasks.job_id = jobs.id) +WHERE tasks.uuid = ?1 +` + +func (q *Queries) FetchTaskJobUUID(ctx context.Context, uuid string) (sql.NullString, error) { + row := q.db.QueryRowContext(ctx, fetchTaskJobUUID, uuid) + var jobuuid sql.NullString + err := row.Scan(&jobuuid) + return jobuuid, err +} + const requestJobDeletion = `-- name: RequestJobDeletion :exec UPDATE jobs SET updated_at = ?1, diff --git a/internal/manager/persistence/sqlc/query_workers.sql b/internal/manager/persistence/sqlc/query_workers.sql new file mode 100644 index 00000000..a2d6713c --- /dev/null +++ b/internal/manager/persistence/sqlc/query_workers.sql @@ -0,0 +1,18 @@ + +-- Worker queries +-- + +-- name: FetchWorker :one +-- FetchWorker only returns the worker if it wasn't soft-deleted. +SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is NULL; + +-- name: FetchWorkerUnconditional :one +-- FetchWorkerUnconditional ignores soft-deletion status and just returns the worker. +SELECT * FROM workers WHERE workers.uuid = @uuid; + +-- name: FetchWorkerTags :many +SELECT worker_tags.* +FROM worker_tags +LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id) +LEFT JOIN workers on (m.worker_id = workers.id) +WHERE workers.uuid = @uuid; diff --git a/internal/manager/persistence/sqlc/query_workers.sql.go b/internal/manager/persistence/sqlc/query_workers.sql.go new file mode 100644 index 00000000..663fa422 --- /dev/null +++ b/internal/manager/persistence/sqlc/query_workers.sql.go @@ -0,0 +1,109 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 +// source: query_workers.sql + +package sqlc + +import ( + "context" +) + +const fetchWorker = `-- name: FetchWorker :one + +SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1 and deleted_at is NULL +` + +// Worker queries +// +// FetchWorker only returns the worker if it wasn't soft-deleted. +func (q *Queries) FetchWorker(ctx context.Context, uuid string) (Worker, error) { + row := q.db.QueryRowContext(ctx, fetchWorker, uuid) + var i Worker + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Secret, + &i.Name, + &i.Address, + &i.Platform, + &i.Software, + &i.Status, + &i.LastSeenAt, + &i.StatusRequested, + &i.LazyStatusRequest, + &i.SupportedTaskTypes, + &i.DeletedAt, + &i.CanRestart, + ) + return i, err +} + +const fetchWorkerTags = `-- name: FetchWorkerTags :many +SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description +FROM worker_tags +LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id) +LEFT JOIN workers on (m.worker_id = workers.id) +WHERE workers.uuid = ?1 +` + +func (q *Queries) FetchWorkerTags(ctx context.Context, uuid string) ([]WorkerTag, error) { + rows, err := q.db.QueryContext(ctx, fetchWorkerTags, uuid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []WorkerTag + for rows.Next() { + var i WorkerTag + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Name, + &i.Description, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchWorkerUnconditional = `-- name: FetchWorkerUnconditional :one +SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1 +` + +// FetchWorkerUnconditional ignores soft-deletion status and just returns the worker. +func (q *Queries) FetchWorkerUnconditional(ctx context.Context, uuid string) (Worker, error) { + row := q.db.QueryRowContext(ctx, fetchWorkerUnconditional, uuid) + var i Worker + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Secret, + &i.Name, + &i.Address, + &i.Platform, + &i.Software, + &i.Status, + &i.LastSeenAt, + &i.StatusRequested, + &i.LazyStatusRequest, + &i.SupportedTaskTypes, + &i.DeletedAt, + &i.CanRestart, + ) + return i, err +} diff --git a/internal/manager/persistence/test_support.go b/internal/manager/persistence/test_support.go index cba04ade..a0c2a92f 100644 --- a/internal/manager/persistence/test_support.go +++ b/internal/manager/persistence/test_support.go @@ -15,7 +15,6 @@ import ( "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "gorm.io/gorm" - "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -106,7 +105,7 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe ctx, cancel, db := persistenceTestFixtures(t, testContextTimeout) w := Worker{ - UUID: uuid.New(), + UUID: "557930e7-5b55-469e-a6d7-fc800f3685be", Name: "дрон", Address: "fe80::5054:ff:fede:2ad7", Platform: "linux", @@ -116,7 +115,7 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe } wc := WorkerTag{ - UUID: uuid.New(), + UUID: "e0e05417-9793-4829-b1d0-d446dd819f3d", Name: "arbejdsklynge", Description: "Worker tag in Danish", } diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index a9637d54..c7d87f9e 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog/log" "gorm.io/gorm" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -73,18 +74,30 @@ func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { } func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { - w := Worker{} - tx := db.gormDB.WithContext(ctx). - Preload("Tags"). - Find(&w, "uuid = ?", uuid). - Limit(1) - if tx.Error != nil { - return nil, workerError(tx.Error, "fetching worker") + queries, err := db.queries() + if err != nil { + return nil, err } - if w.ID == 0 { - return nil, ErrWorkerNotFound + + worker, err := queries.FetchWorker(ctx, uuid) + if err != nil { + return nil, workerError(err, "fetching worker %s", uuid) } - return &w, nil + + // TODO: remove this code, and let the caller fetch the tags when interested in them. + workerTags, err := queries.FetchWorkerTags(ctx, uuid) + if err != nil { + return nil, workerTagError(err, "fetching tags of worker %s", uuid) + } + + convertedWorker := convertSqlcWorker(worker) + convertedWorker.Tags = make([]*WorkerTag, len(workerTags)) + for index := range workerTags { + convertedTag := convertSqlcWorkerTag(workerTags[index]) + convertedWorker.Tags[index] = &convertedTag + } + + return &convertedWorker, nil } func (db *DB) DeleteWorker(ctx context.Context, uuid string) error { @@ -216,3 +229,48 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e return statusCounts, nil } + +// convertSqlcWorker converts a worker from the SQLC-generated model to the model +// expected by the rest of the code. This is mostly in place to aid in the GORM +// to SQLC migration. It is intended that eventually the rest of the code will +// use the same SQLC-generated model. +func convertSqlcWorker(worker sqlc.Worker) Worker { + return Worker{ + Model: Model{ + ID: uint(worker.ID), + CreatedAt: worker.CreatedAt, + UpdatedAt: worker.UpdatedAt.Time, + }, + DeletedAt: gorm.DeletedAt(worker.DeletedAt), + + UUID: worker.UUID, + Secret: worker.Secret, + Name: worker.Name, + Address: worker.Address, + Platform: worker.Platform, + Software: worker.Software, + Status: api.WorkerStatus(worker.Status), + LastSeenAt: worker.LastSeenAt.Time, + CanRestart: worker.CanRestart != 0, + StatusRequested: api.WorkerStatus(worker.StatusRequested), + LazyStatusRequest: worker.LazyStatusRequest != 0, + SupportedTaskTypes: worker.SupportedTaskTypes, + } +} + +// convertSqlcWorkerTag converts a worker tag from the SQLC-generated model to +// the model expected by the rest of the code. This is mostly in place to aid in +// the GORM to SQLC migration. It is intended that eventually the rest of the +// code will use the same SQLC-generated model. +func convertSqlcWorkerTag(tag sqlc.WorkerTag) WorkerTag { + return WorkerTag{ + Model: Model{ + ID: uint(tag.ID), + CreatedAt: tag.CreatedAt, + UpdatedAt: tag.UpdatedAt.Time, + }, + UUID: tag.UUID, + Name: tag.Name, + Description: tag.Description, + } +} diff --git a/sqlc.yaml b/sqlc.yaml index 887ce133..5fd3fcbc 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -14,3 +14,23 @@ sql: rename: uuid: "UUID" uuids: "UUIDs" + jobuuid: "JobUUID" + taskUUID: "TaskUUID" + workeruuid: "WorkerUUID" + - engine: "sqlite" + schema: "internal/manager/persistence/sqlc/schema.sql" + queries: "internal/manager/persistence/sqlc/query_workers.sql" + gen: + go: + out: "internal/manager/persistence/sqlc" + overrides: + - db_type: "jsonb" + go_type: + import: "encoding/json" + type: "RawMessage" + rename: + uuid: "UUID" + uuids: "UUIDs" + jobuuid: "JobUUID" + taskUUID: "TaskUUID" + workeruuid: "WorkerUUID"