From 30518ca3af9c866f9b255c91ebe59b95f4f6d295 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 25 Jan 2022 18:25:26 +0100 Subject: [PATCH] Store tasks & commands in the database --- internal/manager/persistence/db.go | 54 ++++++++++----- internal/manager/persistence/db_migration.go | 3 +- internal/manager/persistence/db_test.go | 69 ++++++++++++++++++-- internal/manager/persistence/models.go | 57 ++++++++++++---- 4 files changed, 150 insertions(+), 33 deletions(-) diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index ee7c4baa..35b23971 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -71,22 +71,46 @@ func openDB(ctx context.Context, uri string) (*DB, error) { } func (db *DB) StoreJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error { + return db.gormDB.Transaction(func(tx *gorm.DB) error { + // TODO: separate conversion of struct types from storing things in the database. + dbJob := Job{ + UUID: authoredJob.JobID, + Name: authoredJob.Name, + JobType: authoredJob.JobType, + Priority: int8(authoredJob.Priority), + Settings: JobSettings(authoredJob.Settings), + Metadata: StringStringMap(authoredJob.Metadata), + } - dbJob := Job{ - UUID: authoredJob.JobID, - Name: authoredJob.Name, - JobType: authoredJob.JobType, - Priority: int8(authoredJob.Priority), - Settings: JobSettings(authoredJob.Settings), - Metadata: StringStringMap(authoredJob.Metadata), - } + if err := db.gormDB.Create(&dbJob).Error; err != nil { + return fmt.Errorf("error storing job: %v", err) + } - tx := db.gormDB.Create(&dbJob) - if tx.Error != nil { - return fmt.Errorf("error storing job: %v", tx.Error) - } + for _, authoredTask := range authoredJob.Tasks { + var commands []Command + for _, authoredCommand := range authoredTask.Commands { + commands = append(commands, Command{ + Type: authoredCommand.Type, + Parameters: authoredCommand.Parameters, + }) + } - return nil + dbTask := Task{ + Name: authoredTask.Name, + Type: authoredTask.Type, + Job: &dbJob, + Priority: authoredTask.Priority, + Status: string(api.TaskStatusProcessing), // TODO: is this the right place to set the default status? + // TODO: store dependencies + Commands: commands, + } + if err := db.gormDB.Create(&dbTask).Error; err != nil { + return fmt.Errorf("error storing task: %v", err) + } + } + + return nil + }) } func (db *DB) FetchJob(ctx context.Context, jobID string) (*api.Job, error) { @@ -109,8 +133,8 @@ func (db *DB) FetchJob(ctx context.Context, jobID string) (*api.Job, error) { Status: api.JobStatus(dbJob.Status), } - apiJob.Settings.AdditionalProperties = dbJob.Settings - apiJob.Metadata.AdditionalProperties = dbJob.Metadata + apiJob.Settings = &api.JobSettings{AdditionalProperties: dbJob.Settings} + apiJob.Metadata = &api.JobMetadata{AdditionalProperties: dbJob.Metadata} return &apiJob, nil } diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index 1125508a..68abc8a0 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -25,7 +25,8 @@ import ( ) func (db *DB) migrate() error { - if err := db.gormDB.AutoMigrate(&Job{}); err != nil { + err := db.gormDB.AutoMigrate(&Job{}, &Task{}) + if err != nil { return fmt.Errorf("failed to automigrate database: %v", err) } return nil diff --git a/internal/manager/persistence/db_test.go b/internal/manager/persistence/db_test.go index 064535a1..35098e85 100644 --- a/internal/manager/persistence/db_test.go +++ b/internal/manager/persistence/db_test.go @@ -67,21 +67,78 @@ func TestStoreAuthoredJob(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - err := db.StoreJob(ctx, job_compilers.AuthoredJob{ + task1 := job_compilers.AuthoredTask{ + Name: "render-1-3", + Type: "blender", + Commands: []job_compilers.AuthoredCommand{ + { + Type: "blender-render", + Parameters: StringStringMap{ + "cmd": "{blender}", + "filepath": "/path/to/file.blend", + "format": "PNG", + "render_output": "/path/to/output/######.png", + "frames": "1-3", + }}, + }, + } + + task2 := task1 + task2.Name = "render-4-6" + task2.Commands[0].Parameters["frames"] = "4-6" + + task3 := job_compilers.AuthoredTask{ + Name: "preview-video", + Type: "ffmpeg", + Commands: []job_compilers.AuthoredCommand{ + { + Type: "merge-frames-to-video", + Parameters: StringStringMap{ + "images": "/path/to/output/######.png", + "output": "/path/to/output/preview.mkv", + "ffmpegParams": "-c:v hevc -crf 31", + }}, + }, + Dependencies: []*job_compilers.AuthoredTask{&task1, &task2}, + } + + job := job_compilers.AuthoredJob{ JobID: "263fd47e-b9f8-4637-b726-fd7e47ecfdae", Name: "Test job", Priority: 50, Settings: job_compilers.JobSettings{ - "frames": "1-20", - "chunk_size": 3, + "frames": "1-6", + "chunk_size": 3.0, // The roundtrip to JSON in PostgreSQL can make this a float. }, Metadata: job_compilers.JobMetadata{ "author": "Sybren", "project": "Sprite Fright", }, - }) + Tasks: []job_compilers.AuthoredTask{task1, task2, task3}, + } - assert.Nil(t, err) + err := db.StoreJob(ctx, job) + assert.NoError(t, err) - // TODO: fetch the job to see it was stored well. + fetchedJob, err := db.FetchJob(ctx, job.JobID) + assert.NoError(t, err) + assert.NotNil(t, fetchedJob) + + // Test contents of fetched job + assert.Equal(t, job.JobID, fetchedJob.Id) + assert.Equal(t, job.Name, fetchedJob.Name) + assert.Equal(t, job.JobType, fetchedJob.Type) + assert.Equal(t, job.Priority, fetchedJob.Priority) + assert.EqualValues(t, map[string]interface{}(job.Settings), fetchedJob.Settings.AdditionalProperties) + assert.EqualValues(t, map[string]string(job.Metadata), fetchedJob.Metadata.AdditionalProperties) + + // Fetch tasks of job. + var dbJob Job + tx := db.gormDB.Where(&Job{UUID: job.JobID}).Find(&dbJob) + assert.NoError(t, tx.Error) + var tasks []Task + tx = db.gormDB.Where("job_id = ?", dbJob.ID).Find(&tasks) + assert.NoError(t, tx.Error) + + assert.Len(t, tasks, 3) } diff --git a/internal/manager/persistence/models.go b/internal/manager/persistence/models.go index 28dddcb7..c571e988 100644 --- a/internal/manager/persistence/models.go +++ b/internal/manager/persistence/models.go @@ -30,18 +30,55 @@ import ( type Job struct { gorm.Model - UUID string `gorm:"type:char(36)"` + UUID string `gorm:"type:char(36);not null;unique;index"` - Name string `gorm:"type:varchar(64)"` - JobType string `gorm:"type:varchar(32)"` - Priority int8 `gorm:"type:smallint"` - Status string `gorm:"type:varchar(32)"` // See JobStatusXxxx consts in openapi_types.gen.go + Name string `gorm:"type:varchar(64);not null"` + JobType string `gorm:"type:varchar(32);not null"` + Priority int8 `gorm:"type:smallint;not null"` + Status string `gorm:"type:varchar(32);not null"` // See JobStatusXxxx consts in openapi_types.gen.go - Settings JobSettings `gorm:"type:jsonb"` - Metadata JobMetadata `gorm:"type:jsonb"` + Settings JobSettings `gorm:"type:jsonb"` + Metadata StringStringMap `gorm:"type:jsonb"` } type JobSettings map[string]interface{} +type StringStringMap map[string]string + +type Task struct { + gorm.Model + + Name string `gorm:"type:varchar(64);not null"` + Type string `gorm:"type:varchar(32);not null"` + JobID uint `gorm:"not null"` + Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE;not null"` + Priority int `gorm:"type:smallint;not null"` + Status string `gorm:"type:varchar(16);not null"` + + // TODO: include info about which worker is/was working on this. + + // Dependencies are tasks that need to be completed before this one can run. + Dependencies []*Task `gorm:"many2many:task_dependencies;"` + + Commands Commands `gorm:"type:jsonb"` +} + +type Commands []Command + +type Command struct { + Type string `json:"type"` + Parameters StringStringMap `json:"parameters"` +} + +func (c Commands) Value() (driver.Value, error) { + return json.Marshal(c) +} +func (c *Commands) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + return json.Unmarshal(b, &c) +} func (js JobSettings) Value() (driver.Value, error) { return json.Marshal(js) @@ -54,12 +91,10 @@ func (js *JobSettings) Scan(value interface{}) error { return json.Unmarshal(b, &js) } -type JobMetadata map[string]string - -func (js JobMetadata) Value() (driver.Value, error) { +func (js StringStringMap) Value() (driver.Value, error) { return json.Marshal(js) } -func (js *JobMetadata) Scan(value interface{}) error { +func (js *StringStringMap) Scan(value interface{}) error { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed")