Store tasks & commands in the database

This commit is contained in:
Sybren A. Stüvel 2022-01-25 18:25:26 +01:00
parent a4247f7a35
commit 30518ca3af
4 changed files with 150 additions and 33 deletions

View File

@ -71,22 +71,46 @@ func openDB(ctx context.Context, uri string) (*DB, error) {
} }
func (db *DB) StoreJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error { func (db *DB) StoreJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error {
return db.gormDB.Transaction(func(tx *gorm.DB) error {
// TODO: separate conversion of struct types from storing things in the database.
dbJob := Job{
UUID: authoredJob.JobID,
Name: authoredJob.Name,
JobType: authoredJob.JobType,
Priority: int8(authoredJob.Priority),
Settings: JobSettings(authoredJob.Settings),
Metadata: StringStringMap(authoredJob.Metadata),
}
dbJob := Job{ if err := db.gormDB.Create(&dbJob).Error; err != nil {
UUID: authoredJob.JobID, return fmt.Errorf("error storing job: %v", err)
Name: authoredJob.Name, }
JobType: authoredJob.JobType,
Priority: int8(authoredJob.Priority),
Settings: JobSettings(authoredJob.Settings),
Metadata: StringStringMap(authoredJob.Metadata),
}
tx := db.gormDB.Create(&dbJob) for _, authoredTask := range authoredJob.Tasks {
if tx.Error != nil { var commands []Command
return fmt.Errorf("error storing job: %v", tx.Error) for _, authoredCommand := range authoredTask.Commands {
} commands = append(commands, Command{
Type: authoredCommand.Type,
Parameters: authoredCommand.Parameters,
})
}
return nil dbTask := Task{
Name: authoredTask.Name,
Type: authoredTask.Type,
Job: &dbJob,
Priority: authoredTask.Priority,
Status: string(api.TaskStatusProcessing), // TODO: is this the right place to set the default status?
// TODO: store dependencies
Commands: commands,
}
if err := db.gormDB.Create(&dbTask).Error; err != nil {
return fmt.Errorf("error storing task: %v", err)
}
}
return nil
})
} }
func (db *DB) FetchJob(ctx context.Context, jobID string) (*api.Job, error) { func (db *DB) FetchJob(ctx context.Context, jobID string) (*api.Job, error) {
@ -109,8 +133,8 @@ func (db *DB) FetchJob(ctx context.Context, jobID string) (*api.Job, error) {
Status: api.JobStatus(dbJob.Status), Status: api.JobStatus(dbJob.Status),
} }
apiJob.Settings.AdditionalProperties = dbJob.Settings apiJob.Settings = &api.JobSettings{AdditionalProperties: dbJob.Settings}
apiJob.Metadata.AdditionalProperties = dbJob.Metadata apiJob.Metadata = &api.JobMetadata{AdditionalProperties: dbJob.Metadata}
return &apiJob, nil return &apiJob, nil
} }

View File

@ -25,7 +25,8 @@ import (
) )
func (db *DB) migrate() error { func (db *DB) migrate() error {
if err := db.gormDB.AutoMigrate(&Job{}); err != nil { err := db.gormDB.AutoMigrate(&Job{}, &Task{})
if err != nil {
return fmt.Errorf("failed to automigrate database: %v", err) return fmt.Errorf("failed to automigrate database: %v", err)
} }
return nil return nil

View File

@ -67,21 +67,78 @@ func TestStoreAuthoredJob(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() defer cancel()
err := db.StoreJob(ctx, job_compilers.AuthoredJob{ task1 := job_compilers.AuthoredTask{
Name: "render-1-3",
Type: "blender",
Commands: []job_compilers.AuthoredCommand{
{
Type: "blender-render",
Parameters: StringStringMap{
"cmd": "{blender}",
"filepath": "/path/to/file.blend",
"format": "PNG",
"render_output": "/path/to/output/######.png",
"frames": "1-3",
}},
},
}
task2 := task1
task2.Name = "render-4-6"
task2.Commands[0].Parameters["frames"] = "4-6"
task3 := job_compilers.AuthoredTask{
Name: "preview-video",
Type: "ffmpeg",
Commands: []job_compilers.AuthoredCommand{
{
Type: "merge-frames-to-video",
Parameters: StringStringMap{
"images": "/path/to/output/######.png",
"output": "/path/to/output/preview.mkv",
"ffmpegParams": "-c:v hevc -crf 31",
}},
},
Dependencies: []*job_compilers.AuthoredTask{&task1, &task2},
}
job := job_compilers.AuthoredJob{
JobID: "263fd47e-b9f8-4637-b726-fd7e47ecfdae", JobID: "263fd47e-b9f8-4637-b726-fd7e47ecfdae",
Name: "Test job", Name: "Test job",
Priority: 50, Priority: 50,
Settings: job_compilers.JobSettings{ Settings: job_compilers.JobSettings{
"frames": "1-20", "frames": "1-6",
"chunk_size": 3, "chunk_size": 3.0, // The roundtrip to JSON in PostgreSQL can make this a float.
}, },
Metadata: job_compilers.JobMetadata{ Metadata: job_compilers.JobMetadata{
"author": "Sybren", "author": "Sybren",
"project": "Sprite Fright", "project": "Sprite Fright",
}, },
}) Tasks: []job_compilers.AuthoredTask{task1, task2, task3},
}
assert.Nil(t, err) err := db.StoreJob(ctx, job)
assert.NoError(t, err)
// TODO: fetch the job to see it was stored well. fetchedJob, err := db.FetchJob(ctx, job.JobID)
assert.NoError(t, err)
assert.NotNil(t, fetchedJob)
// Test contents of fetched job
assert.Equal(t, job.JobID, fetchedJob.Id)
assert.Equal(t, job.Name, fetchedJob.Name)
assert.Equal(t, job.JobType, fetchedJob.Type)
assert.Equal(t, job.Priority, fetchedJob.Priority)
assert.EqualValues(t, map[string]interface{}(job.Settings), fetchedJob.Settings.AdditionalProperties)
assert.EqualValues(t, map[string]string(job.Metadata), fetchedJob.Metadata.AdditionalProperties)
// Fetch tasks of job.
var dbJob Job
tx := db.gormDB.Where(&Job{UUID: job.JobID}).Find(&dbJob)
assert.NoError(t, tx.Error)
var tasks []Task
tx = db.gormDB.Where("job_id = ?", dbJob.ID).Find(&tasks)
assert.NoError(t, tx.Error)
assert.Len(t, tasks, 3)
} }

View File

@ -30,18 +30,55 @@ import (
type Job struct { type Job struct {
gorm.Model gorm.Model
UUID string `gorm:"type:char(36)"` UUID string `gorm:"type:char(36);not null;unique;index"`
Name string `gorm:"type:varchar(64)"` Name string `gorm:"type:varchar(64);not null"`
JobType string `gorm:"type:varchar(32)"` JobType string `gorm:"type:varchar(32);not null"`
Priority int8 `gorm:"type:smallint"` Priority int8 `gorm:"type:smallint;not null"`
Status string `gorm:"type:varchar(32)"` // See JobStatusXxxx consts in openapi_types.gen.go Status string `gorm:"type:varchar(32);not null"` // See JobStatusXxxx consts in openapi_types.gen.go
Settings JobSettings `gorm:"type:jsonb"` Settings JobSettings `gorm:"type:jsonb"`
Metadata JobMetadata `gorm:"type:jsonb"` Metadata StringStringMap `gorm:"type:jsonb"`
} }
type JobSettings map[string]interface{} type JobSettings map[string]interface{}
type StringStringMap map[string]string
type Task struct {
gorm.Model
Name string `gorm:"type:varchar(64);not null"`
Type string `gorm:"type:varchar(32);not null"`
JobID uint `gorm:"not null"`
Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE;not null"`
Priority int `gorm:"type:smallint;not null"`
Status string `gorm:"type:varchar(16);not null"`
// TODO: include info about which worker is/was working on this.
// Dependencies are tasks that need to be completed before this one can run.
Dependencies []*Task `gorm:"many2many:task_dependencies;"`
Commands Commands `gorm:"type:jsonb"`
}
type Commands []Command
type Command struct {
Type string `json:"type"`
Parameters StringStringMap `json:"parameters"`
}
func (c Commands) Value() (driver.Value, error) {
return json.Marshal(c)
}
func (c *Commands) Scan(value interface{}) error {
b, ok := value.([]byte)
if !ok {
return errors.New("type assertion to []byte failed")
}
return json.Unmarshal(b, &c)
}
func (js JobSettings) Value() (driver.Value, error) { func (js JobSettings) Value() (driver.Value, error) {
return json.Marshal(js) return json.Marshal(js)
@ -54,12 +91,10 @@ func (js *JobSettings) Scan(value interface{}) error {
return json.Unmarshal(b, &js) return json.Unmarshal(b, &js)
} }
type JobMetadata map[string]string func (js StringStringMap) Value() (driver.Value, error) {
func (js JobMetadata) Value() (driver.Value, error) {
return json.Marshal(js) return json.Marshal(js)
} }
func (js *JobMetadata) Scan(value interface{}) error { func (js *StringStringMap) Scan(value interface{}) error {
b, ok := value.([]byte) b, ok := value.([]byte)
if !ok { if !ok {
return errors.New("type assertion to []byte failed") return errors.New("type assertion to []byte failed")