diff --git a/cmd/flamenco-manager-poc/main.go b/cmd/flamenco-manager-poc/main.go index 30ef90d7..28083f68 100644 --- a/cmd/flamenco-manager-poc/main.go +++ b/cmd/flamenco-manager-poc/main.go @@ -52,12 +52,12 @@ func main() { // Open the database. dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) defer dbCtxCancel() - _, err := persistence.OpenDB(dbCtx) + persist, err := persistence.OpenDB(dbCtx) if err != nil { log.Fatal().Err(err).Msg("error opening database") } - // TODO: load port number from the configuration. + // TODO: load port number from the configuration in the database. // TODO: enable TLS via Let's Encrypt. listen := ":8080" _, port, _ := net.SplitHostPort(listen) @@ -68,7 +68,7 @@ func main() { if err != nil { log.Fatal().Err(err).Msg("error loading job compilers") } - flamenco := api_impl.NewFlamenco(compiler) + flamenco := api_impl.NewFlamenco(compiler, persist) e := buildWebService(flamenco) // Start the web server. diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index f31cc559..fc20f2ff 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -22,24 +22,35 @@ package api_impl * ***** END GPL LICENSE BLOCK ***** */ import ( + "context" + "github.com/labstack/echo/v4" + "gitlab.com/blender/flamenco-goja-test/internal/manager/job_compilers" "gitlab.com/blender/flamenco-goja-test/pkg/api" ) type Flamenco struct { jobCompiler JobCompiler + persist JobPersistenceService +} + +type JobPersistenceService interface { + // StoreJob stores a job in the persistence layer. + StoreJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error } type JobCompiler interface { ListJobTypes() api.AvailableJobTypes + Compile(ctx context.Context, job api.SubmittedJob) (*job_compilers.AuthoredJob, error) } var _ api.ServerInterface = (*Flamenco)(nil) // NewFlamenco creates a new Flamenco service, using the given JobCompiler. -func NewFlamenco(jc JobCompiler) *Flamenco { +func NewFlamenco(jc JobCompiler, jps JobPersistenceService) *Flamenco { return &Flamenco{ jobCompiler: jc, + persist: jps, } } diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go new file mode 100644 index 00000000..af5f1b07 --- /dev/null +++ b/internal/manager/api_impl/jobs.go @@ -0,0 +1,77 @@ +package api_impl + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "fmt" + "net/http" + + "github.com/labstack/echo/v4" + "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-goja-test/pkg/api" +) + +func (f *Flamenco) GetJobTypes(e echo.Context) error { + if f.jobCompiler == nil { + log.Error().Msg("Flamenco is running without job compiler") + return sendAPIError(e, http.StatusInternalServerError, "no job types available") + } + + jobTypes := f.jobCompiler.ListJobTypes() + return e.JSON(http.StatusOK, &jobTypes) +} + +func (f *Flamenco) SubmitJob(e echo.Context) error { + // TODO: move this into some middleware. + logger := log.With(). + Str("ip", e.RealIP()). + Logger() + + var job api.SubmitJobJSONRequestBody + if err := e.Bind(&job); err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + + logger = logger.With(). + Str("type", job.Type). + Str("name", job.Name). + Logger() + logger.Info().Msg("new Flamenco job received") + + ctx := e.Request().Context() + submittedJob := api.SubmittedJob(job) + authoredJob, err := f.jobCompiler.Compile(ctx, submittedJob) + if err != nil { + logger.Warn().Err(err).Msg("error compiling job") + // TODO: make this a more specific error object for this API call. + return sendAPIError(e, http.StatusBadRequest, fmt.Sprintf("error compiling job: %v", err)) + } + + logger = logger.With().Str("job_id", authoredJob.JobID).Logger() + + if err := f.persist.StoreJob(ctx, *authoredJob); err != nil { + logger.Error().Err(err).Msg("error persisting job in database") + return sendAPIError(e, http.StatusInternalServerError, "error persisting job in database") + } + + return e.JSON(http.StatusOK, authoredJob) +} diff --git a/internal/manager/job_compilers/author.go b/internal/manager/job_compilers/author.go index e88c5727..c6a0ca61 100644 --- a/internal/manager/job_compilers/author.go +++ b/internal/manager/job_compilers/author.go @@ -25,6 +25,7 @@ import ( "github.com/dop251/goja" "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-goja-test/pkg/api" ) // Author allows scripts to author tasks and commands. @@ -36,7 +37,8 @@ type AuthoredJob struct { JobID string Name string JobType string - Priority int8 + Priority int + Status api.JobStatus Created time.Time diff --git a/internal/manager/job_compilers/job_compilers.go b/internal/manager/job_compilers/job_compilers.go index 6fd3bf7f..0ce38fdb 100644 --- a/internal/manager/job_compilers/job_compilers.go +++ b/internal/manager/job_compilers/job_compilers.go @@ -23,6 +23,7 @@ package job_compilers * ***** END GPL LICENSE BLOCK ***** */ import ( + "context" "errors" "time" @@ -53,6 +54,9 @@ type VM struct { compiler Compiler // Program loaded into this VM. } +// jobCompileFunc is a function that fills job.Tasks. +type jobCompileFunc func(job *AuthoredJob) error + func Load() (*Service, error) { compiler := Service{ compilers: map[string]Compiler{}, @@ -81,6 +85,52 @@ func Load() (*Service, error) { return &compiler, nil } +func (s *Service) Compile(ctx context.Context, sj api.SubmittedJob) (*AuthoredJob, error) { + vm, err := s.compilerForJobType(sj.Type) + if err != nil { + return nil, err + } + + // Create an AuthoredJob from this SubmittedJob. + aj := AuthoredJob{ + JobID: uuid.New().String(), // Ignore the submitted ID. + Name: sj.Name, + JobType: sj.Type, + Priority: sj.Priority, + Status: api.JobStatusUnderConstruction, + + Settings: make(JobSettings), + Metadata: make(JobMetadata), + } + if sj.Settings != nil { + for key, value := range *sj.Settings { + aj.Settings[key] = value + } + } + if sj.Metadata != nil { + for key, value := range *sj.Metadata { + // TODO: make sure OpenAPI understands these keys can only be strings. + aj.Metadata[key] = value.(string) + } + } + + compiler, err := vm.getCompileJob() + if err != nil { + return nil, err + } + if err := compiler(&aj); err != nil { + return nil, err + } + + log.Info(). + Int("num_tasks", len(aj.Tasks)). + Str("name", aj.Name). + Str("jobtype", aj.JobType). + Msg("job compiled") + + return &aj, nil +} + func (s *Service) Run(jobTypeName string) error { vm, err := s.compilerForJobType(jobTypeName) if err != nil { @@ -155,10 +205,10 @@ func (s *Service) ListJobTypes() api.AvailableJobTypes { return api.AvailableJobTypes{JobTypes: jobTypes} } -func (vm *VM) getCompileJob() (func(job *AuthoredJob) error, error) { +func (vm *VM) getCompileJob() (jobCompileFunc, error) { compileJob, isCallable := goja.AssertFunction(vm.runtime.Get("compileJob")) if !isCallable { - // TODO: construct a more elaborate Error object that contains this info, instead of logging here. + // TODO: construct a more elaborate Error type that contains this info, instead of logging here. log.Error(). Str("jobType", vm.compiler.jobType). Str("script", vm.compiler.filename). @@ -178,7 +228,7 @@ func (vm *VM) getJobTypeInfo() (*api.AvailableJobType, error) { var ajt api.AvailableJobType if err := vm.runtime.ExportTo(jtValue, &ajt); err != nil { - // TODO: construct a more elaborate Error object that contains this info, instead of logging here. + // TODO: construct a more elaborate Error type that contains this info, instead of logging here. log.Error(). Err(err). Str("jobType", vm.compiler.jobType). diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index e2d03ad6..bd5de3d2 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -28,6 +28,8 @@ import ( "github.com/rs/zerolog/log" _ "modernc.org/sqlite" + + "gitlab.com/blender/flamenco-goja-test/internal/manager/job_compilers" ) // TODO : have this configurable from the CLI. @@ -40,8 +42,11 @@ type DB struct { func OpenDB(ctx context.Context) (*DB, error) { log.Info().Str("uri", dbURI).Msg("opening database") + return openDB(ctx, dbURI) +} - sqldb, err := sql.Open("sqlite", dbURI) +func openDB(ctx context.Context, uri string) (*DB, error) { + sqldb, err := sql.Open("sqlite", uri) if err != nil { return nil, fmt.Errorf("unable to open database: %w", err) } @@ -59,3 +64,41 @@ func OpenDB(ctx context.Context) (*DB, error) { return &db, err } + +func (db *DB) StoreJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error { + tx, err := db.sqldb.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + _, err = tx.ExecContext(ctx, + `INSERT INTO jobs (uuid, name, jobType, priority) VALUES (?, ?, ?, ?)`, + authoredJob.JobID, authoredJob.Name, authoredJob.JobType, authoredJob.Priority, + ) + if err != nil { + return err + } + + for key, value := range authoredJob.Settings { + _, err := tx.ExecContext(ctx, + `INSERT INTO job_settings (job_id, key, value) VALUES (?, ?, ?)`, + authoredJob.JobID, key, value, + ) + if err != nil { + return err + } + } + + for key, value := range authoredJob.Metadata { + _, err := tx.ExecContext(ctx, + `INSERT INTO job_metadata (job_id, key, value) VALUES (?, ?, ?)`, + authoredJob.JobID, key, value, + ) + if err != nil { + return err + } + } + + return tx.Commit() +} diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index 3f1de0f6..421d104a 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -53,7 +53,7 @@ func (db *DB) migrate() error { } err = m.Up() - if err != nil { + if err != nil && err != migrate.ErrNoChange { return fmt.Errorf("cannot migrate database: %w", err) } return nil diff --git a/internal/manager/persistence/db_test.go b/internal/manager/persistence/db_test.go new file mode 100644 index 00000000..b25733d2 --- /dev/null +++ b/internal/manager/persistence/db_test.go @@ -0,0 +1,74 @@ +// Package persistence provides the database interface for Flamenco Manager. +package persistence + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "gitlab.com/blender/flamenco-goja-test/internal/manager/job_compilers" + "golang.org/x/net/context" + _ "modernc.org/sqlite" +) + +const testURI = "testing.sqlite" + +func createTestDB(t *testing.T) (*DB, func()) { + // Creating a new database should be fast. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + db, err := openDB(ctx, testURI) + assert.Nil(t, err) + + return db, func() { + os.Remove(testURI) + } +} + +func TestStoreAuthoredJob(t *testing.T) { + db, cleanup := createTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := db.StoreJob(ctx, job_compilers.AuthoredJob{ + JobID: "263fd47e-b9f8-4637-b726-fd7e47ecfdae", + Name: "Test job", + Priority: 50, + Settings: job_compilers.JobSettings{ + "frames": "1-20", + "chunk_size": 3, + }, + Metadata: job_compilers.JobMetadata{ + "author": "Sybren", + "project": "Sprite Fright", + }, + }) + + assert.Nil(t, err) + + // TODO: fetch the job to see it was stored well. +} diff --git a/internal/manager/persistence/migrations/000001_create_jobs_table.down.sql b/internal/manager/persistence/migrations/000001_create_jobs_table.down.sql new file mode 100644 index 00000000..6bffc015 --- /dev/null +++ b/internal/manager/persistence/migrations/000001_create_jobs_table.down.sql @@ -0,0 +1 @@ +DROP TABLE jobs; diff --git a/internal/manager/persistence/migrations/000001_create_jobs_table.up.sql b/internal/manager/persistence/migrations/000001_create_jobs_table.up.sql new file mode 100644 index 00000000..919c0999 --- /dev/null +++ b/internal/manager/persistence/migrations/000001_create_jobs_table.up.sql @@ -0,0 +1,24 @@ +CREATE TABLE jobs ( + uuid CHAR(36) PRIMARY KEY NOT NULL, + name VARCHAR(255) NOT NULL, + jobType VARCHAR(255) NOT NULL, + priority INT NOT NULL, + created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL +); + +CREATE TABLE job_settings ( + job_id CHAR(36), + key VARCHAR(255), + value TEXT, + FOREIGN KEY(job_id) REFERENCES jobs(uuid) +); +CREATE UNIQUE INDEX job_settings_index ON job_settings(job_id, key); + +CREATE TABLE job_metadata ( + job_id CHAR(36), + key VARCHAR(255), + value TEXT, + FOREIGN KEY(job_id) REFERENCES jobs(uuid) +); +CREATE UNIQUE INDEX job_metadata_index ON job_metadata(job_id, key); diff --git a/internal/manager/persistence/migrations/000001_dummy_initial.down.sql b/internal/manager/persistence/migrations/000001_dummy_initial.down.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/internal/manager/persistence/migrations/000001_dummy_initial.up.sql b/internal/manager/persistence/migrations/000001_dummy_initial.up.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/internal/manager/api_impl/job_compiler.go b/internal/manager/persistence/types.go similarity index 66% rename from internal/manager/api_impl/job_compiler.go rename to internal/manager/persistence/types.go index caaf87e5..2c0be7a1 100644 --- a/internal/manager/api_impl/job_compiler.go +++ b/internal/manager/persistence/types.go @@ -1,4 +1,4 @@ -package api_impl +package persistence /* ***** BEGIN GPL LICENSE BLOCK ***** * @@ -20,19 +20,25 @@ package api_impl * * ***** END GPL LICENSE BLOCK ***** */ -import ( - "net/http" +import "time" - "github.com/labstack/echo/v4" - "github.com/rs/zerolog/log" -) - -func (f *Flamenco) GetJobTypes(e echo.Context) error { - if f.jobCompiler == nil { - log.Error().Msg("Flamenco is running without job compiler") - return sendAPIError(e, http.StatusInternalServerError, "no job types available") - } - - jobTypes := f.jobCompiler.ListJobTypes() - return e.JSON(http.StatusOK, &jobTypes) +type Job struct { + ID string // UUID + Name string + JobType string + Priority int8 + Created time.Time + Updated time.Time +} + +type JobSetting struct { + JobID string + Key string + Value string +} + +type JobMetadata struct { + JobID string + Key string + Value string } diff --git a/pkg/api/flamenco-manager.yaml b/pkg/api/flamenco-manager.yaml index efb40c0a..0c007696 100644 --- a/pkg/api/flamenco-manager.yaml +++ b/pkg/api/flamenco-manager.yaml @@ -73,12 +73,37 @@ paths: application/json: schema: {$ref: "#/components/schemas/AvailableJobTypes"} + /api/jobs: + summary: Job submission endpoint. + post: + operationId: submitJob + summary: Submit a new job for Flamenco Manager to execute. + tags: [jobs] + # TODO: Security! + requestBody: + description: Job to submit + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/SubmittedJob" + responses: + "200": + description: Job was succesfully compiled into individual tasks. + content: + application/json: + schema: {$ref: "#/components/schemas/SubmittedJob"} + default: + description: Error message + content: + application/json: + schema: {$ref: "#/components/schemas/Error"} tags: - - name: worker - description: API for Flamenco Workers to communicate with Flamenco Manager. - name: jobs description: Blablabla for users to get job metadata balbla + - name: worker + description: API for Flamenco Workers to communicate with Flamenco Manager. components: schemas: @@ -96,7 +121,7 @@ components: RegisteredWorker: type: object properties: - _id: {type: string} + id: {type: string} nickname: {type: string} address: {type: string} status: {type: string} @@ -106,13 +131,13 @@ components: supported_task_types: type: array items: {type: string} - required: [_id, nickname, address, status, platform, current_task, last_activity, software, supported_task_types] + required: [id, nickname, address, status, platform, current_task, last_activity, software, supported_task_types] AssignedTask: type: object description: AssignedTask is a task as it is received by the Worker. properties: - _id: {type: string} + id: {type: string} job: {type: string} user: {type: string} name: {type: string} @@ -124,7 +149,7 @@ components: commands: type: array items: {$ref: "#/components/schemas/Command"} - required: [_id, job, user, name, status, priority, job_priority, job_type, task_type, commands] + required: [id, job, user, name, status, priority, job_priority, job_type, task_type, commands] JobStatus: type: string @@ -213,6 +238,41 @@ components: `HASHED_FILE_PATH` is a directory path + `"/######"` appended. enum: ["file_path", "dir_path", "file_name", "hashed_file_path"] + SubmittedJob: + type: object + description: Job definition submitted to Flamenco. + properties: + "id": {type: string} + "name": {type: string} + "type": {type: string} + "status": {$ref: "#/components/schemas/JobStatus"} + "priority": {type: integer, default: 50} + "settings": + type: object + "metadata": + type: object + required: [name, type, priority] + example: + type: "simple-blender-render" + name: 3Д рендеринг + priority: 50 + settings: + blender_cmd: "{blender}" + filepath: "/render/sf/jobs/scene123.blend" + render_output: "/render/sf/frames/scene123" + frames: "1-10" + chunk_size: 3 + fps: 24 + extract_audio: true + images_or_video: "images" + format: "PNG" + output_file_extension: ".png" + metadata: + "user.name": "Sybren Stüvel" + "user.email": "sybren@blender.org" + project: "Sprite Fright" + + Error: type: object required: [code, message] @@ -226,15 +286,23 @@ components: properties: message: {type: string} + Configuration: + type: object + description: Flamenco Manager configuration + properties: + "_meta": + type: object + properties: + "schema_version": {type: integer} + required: [schema_version] + "settings": + description: Key-value pairs for settings. + type: object + additionalProperties: true + required: [_meta, settings] securitySchemes: worker_auth: description: Username is the worker ID, password is the secret given at worker registration. type: http scheme: basic - user_auth: - description: > - OAuth2 authentication flow for authenticating users. Currently not - implemented, so now more used as a "todo" than actual authentication. - type: http - scheme: basic diff --git a/pkg/api/openapi_client.gen.go b/pkg/api/openapi_client.gen.go index eb465cac..646b936d 100644 --- a/pkg/api/openapi_client.gen.go +++ b/pkg/api/openapi_client.gen.go @@ -88,6 +88,11 @@ func WithRequestEditorFn(fn RequestEditorFn) ClientOption { // The interface specification for the client above. type ClientInterface interface { + // SubmitJob request with any body + SubmitJobWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) + + SubmitJob(ctx context.Context, body SubmitJobJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) + // GetJobTypes request GetJobTypes(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) @@ -100,6 +105,30 @@ type ClientInterface interface { ScheduleTask(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) } +func (c *Client) SubmitJobWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewSubmitJobRequestWithBody(c.Server, contentType, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) SubmitJob(ctx context.Context, body SubmitJobJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewSubmitJobRequest(c.Server, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + func (c *Client) GetJobTypes(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) { req, err := NewGetJobTypesRequest(c.Server) if err != nil { @@ -148,6 +177,46 @@ func (c *Client) ScheduleTask(ctx context.Context, reqEditors ...RequestEditorFn return c.Client.Do(req) } +// NewSubmitJobRequest calls the generic SubmitJob builder with application/json body +func NewSubmitJobRequest(server string, body SubmitJobJSONRequestBody) (*http.Request, error) { + var bodyReader io.Reader + buf, err := json.Marshal(body) + if err != nil { + return nil, err + } + bodyReader = bytes.NewReader(buf) + return NewSubmitJobRequestWithBody(server, "application/json", bodyReader) +} + +// NewSubmitJobRequestWithBody generates requests for SubmitJob with any type of body +func NewSubmitJobRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) { + var err error + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/api/jobs") + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", queryURL.String(), body) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Type", contentType) + + return req, nil +} + // NewGetJobTypesRequest generates requests for GetJobTypes func NewGetJobTypesRequest(server string) (*http.Request, error) { var err error @@ -285,6 +354,11 @@ func WithBaseURL(baseURL string) ClientOption { // ClientWithResponsesInterface is the interface specification for the client with responses above. type ClientWithResponsesInterface interface { + // SubmitJob request with any body + SubmitJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*SubmitJobResponse, error) + + SubmitJobWithResponse(ctx context.Context, body SubmitJobJSONRequestBody, reqEditors ...RequestEditorFn) (*SubmitJobResponse, error) + // GetJobTypes request GetJobTypesWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*GetJobTypesResponse, error) @@ -297,6 +371,29 @@ type ClientWithResponsesInterface interface { ScheduleTaskWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*ScheduleTaskResponse, error) } +type SubmitJobResponse struct { + Body []byte + HTTPResponse *http.Response + JSON200 *SubmittedJob + JSONDefault *Error +} + +// Status returns HTTPResponse.Status +func (r SubmitJobResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r SubmitJobResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + type GetJobTypesResponse struct { Body []byte HTTPResponse *http.Response @@ -365,6 +462,23 @@ func (r ScheduleTaskResponse) StatusCode() int { return 0 } +// SubmitJobWithBodyWithResponse request with arbitrary body returning *SubmitJobResponse +func (c *ClientWithResponses) SubmitJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*SubmitJobResponse, error) { + rsp, err := c.SubmitJobWithBody(ctx, contentType, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseSubmitJobResponse(rsp) +} + +func (c *ClientWithResponses) SubmitJobWithResponse(ctx context.Context, body SubmitJobJSONRequestBody, reqEditors ...RequestEditorFn) (*SubmitJobResponse, error) { + rsp, err := c.SubmitJob(ctx, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseSubmitJobResponse(rsp) +} + // GetJobTypesWithResponse request returning *GetJobTypesResponse func (c *ClientWithResponses) GetJobTypesWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*GetJobTypesResponse, error) { rsp, err := c.GetJobTypes(ctx, reqEditors...) @@ -400,6 +514,39 @@ func (c *ClientWithResponses) ScheduleTaskWithResponse(ctx context.Context, reqE return ParseScheduleTaskResponse(rsp) } +// ParseSubmitJobResponse parses an HTTP response from a SubmitJobWithResponse call +func ParseSubmitJobResponse(rsp *http.Response) (*SubmitJobResponse, error) { + bodyBytes, err := ioutil.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &SubmitJobResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200: + var dest SubmittedJob + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON200 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && true: + var dest Error + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSONDefault = &dest + + } + + return response, nil +} + // ParseGetJobTypesResponse parses an HTTP response from a GetJobTypesWithResponse call func ParseGetJobTypesResponse(rsp *http.Response) (*GetJobTypesResponse, error) { bodyBytes, err := ioutil.ReadAll(rsp.Body) diff --git a/pkg/api/openapi_server.gen.go b/pkg/api/openapi_server.gen.go index 66a58be4..e7bcb5da 100644 --- a/pkg/api/openapi_server.gen.go +++ b/pkg/api/openapi_server.gen.go @@ -9,6 +9,9 @@ import ( // ServerInterface represents all server handlers. type ServerInterface interface { + // Submit a new job for Flamenco Manager to execute. + // (POST /api/jobs) + SubmitJob(ctx echo.Context) error // Get list of job types and their parameters. // (GET /api/jobs/types) GetJobTypes(ctx echo.Context) error @@ -25,6 +28,15 @@ type ServerInterfaceWrapper struct { Handler ServerInterface } +// SubmitJob converts echo context to params. +func (w *ServerInterfaceWrapper) SubmitJob(ctx echo.Context) error { + var err error + + // Invoke the callback with all the unmarshalled arguments + err = w.Handler.SubmitJob(ctx) + return err +} + // GetJobTypes converts echo context to params. func (w *ServerInterfaceWrapper) GetJobTypes(ctx echo.Context) error { var err error @@ -82,6 +94,7 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL Handler: si, } + router.POST(baseURL+"/api/jobs", wrapper.SubmitJob) router.GET(baseURL+"/api/jobs/types", wrapper.GetJobTypes) router.POST(baseURL+"/api/worker/register-worker", wrapper.RegisterWorker) router.POST(baseURL+"/api/worker/task", wrapper.ScheduleTask) diff --git a/pkg/api/openapi_spec.gen.go b/pkg/api/openapi_spec.gen.go index 3eb4d7ee..f7801054 100644 --- a/pkg/api/openapi_spec.gen.go +++ b/pkg/api/openapi_spec.gen.go @@ -18,39 +18,46 @@ import ( // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/7xZ32/bOBL+VwbaA24Xp9i5Zp/ylm7baxa9bbDJog9NkFDi2GJCkSo5imsU/t8PQ1Ky", - "bMlJe9feYtHK4o8ZfvPNzEf1S1baurEGDfns9EvmywprER7PvFdLg/JK+Af+LdGXTjWkrMlOd0ZBeRBA", - "/CQ8KOLfDktUjyihWANVCB+se0A3y/KscbZBRwqDlVsl+S9aN5idZp6cMstsk7NbtTAyzFGEdXj4m8NF", - "dpr9NN86PU8ez3+LC3ht2kw4J9b8+94WkzbubXHbOGWdovVggjKES3TdjPh2YrkR9fTA03t6EtQ+exzG", - "9TLO5BMJ/3DYkdajmxjY5JnDT61yKLPTjwHpiEVakU7QezRwfA+bARBDXwZRuulRt8U9lsRunT0KpUWh", - "8XdbXCIROzXi0aUyS43g4zjYBQj43RbAu/kxXcrKqjI+7u7zoUIDS/WIJgetakWBdY9CK8l/tuiBLL/z", - "CGmTGbw3eg2tZx9hpaiCCF0wzrZ74o0g36eYxIVoNY39uqoQ0mD0A3xlVyY5AxwIWLHvEgldrUywXynf", - "QTLj7VEqYi/j/snUQmiP+RgHqtDx/kJruwJeur8niAXxnArh3hZQCQ8FogHfFrUiQjmDD7bVElTd6DVI", - "1BiXaQ34Wfm4ofAPHhbWxa3vbZGDMJIz39aN0jxH0ezaZD1chbUaheETPeB6DNa5RENqodClfXti5FC3", - "nqBAaI361MZwKdMfoYvYKFDbDPgG5FRdo1SCUK/BIfMZRDAjcaGM4gU5UzUcnE3mwR/bUnzVCEeqbLVw", - "fRQPwODbokvrp6rBRCpdppU9Gb95h6u0/FF5tc8tcu1TADGHdxmVYvHXeUxhBqtjk4OftXpAEPBSo5Ho", - "QEh5ZM0vM7hE4u3uQkDuYiLE/iEMcM10RujeBlWC2HSrpfl7IEOfS2hkyCU/DfReKWTypUlfWbgut3Ha", - "q19tccQjkQ6RjF3M4bfWOTSk12C50ohu38DuQa3xM7h7e3b59vWr2zfn717fXpxdvb2LXVUqhyVZt4ZG", - "UAX/gLvrbP5T+O86uwPRNAypjMdG09Z8voXSeMvzszyTynWP4XWq+ZXwFcrb7cybieQ5RJpxlUsIDE4/", - "yNhYYIWH81cXsZqvw7GZNIkSM/jDgkFPKBmYtqTWoYefQ4H1OUhVsinhFPpfQDgE3zaNdbR/9OR8zh33", - "5AUfWltBWR648Owhp0/X9aOtzahqlId/CyOW6GLlUxRSX9RcyiealxYF6m+TEgnMr5dBU0131K/20iFR", - "Iro3sPlcbjBaE634nfLUkSGw+zBuY4w6ofHfnfhqpyIeOO7WxNQBOx05OlYaAIeNQ88ugAAf5UvSQaES", - "fcayJXxO935VxPecmw7bk+F67ZwN2nBPRlkZrC+sqwVFfRqyZSxXa/ReLPF5fRn23M6f8uZPXCpP6FBG", - "WMaOHboOCCkd+mktpoWnW1GSetzV3IP0UuXDYbWuBTEO09GwC1oJdyBUvZIfD3WMv+3l8i6jn1GUk9q9", - "P8YWj6F4786RZ2XsO8F2tg/Q4FAH/JwK3SWWLd8FDhDqq1nyFD0Gd57TL31FD46Hy4YwJWqU8d7RaKTw", - "vBAqvvzUYhse+HxH/eu47IidCP0lLdl50YjWxwdnS/Sc1ZO9ItI20tiJWBj2ofgfyIalQ/p/8ClZ2iHN", - "pIkB6cYRiy4HWlxyMY4usQy7FS1V4xL6/qyl6gXwIAv9MiAIC76msCIavjfLpOcGIspYCjcSrNEQyhy8", - "BWNXUFsXblKSNYaA64ystNcZ60UDoqRW6D2bUTOEDhJ0ovCq3Fa/iqhhCFch2gfO8pdncVojyzQu9HEy", - "nL/KoRHer6yT3VBEO95OQVA31Q1oNHvWHUZbmYWN5duQKGnbR7I3WtRoSgtXKDiYrdNppT+dzxdpdKbs", - "PN5Whyf5M4ryN8LVUMe+DGcX51w4VInG48DOvy7ePZ6M9l+tVrOlaWfWLedpjZ8vG310MjueoZlVVOvA", - "SUV6x9tkLsuzR3Q+uvPP2fHsmGfbBo1oVHaanYRXnKZUBY7NRaPm97bw8z4TljFzOBMDoueS3UXqdQpn", - "gG8s+8YTXxwfd1CiCUtF0+jEj/m9j6kdRca3ShAfw7X3xaxX/70wiinU1rVw6+gt6JF4CrqiQuX2tCUJ", - "FglBzvjsZmejrake6fvtN5VNHvGLNJy71JSPVtuebP0EmF33Tr07VhT09NLK9XeDcqLCTmAZZ7Hc6rzP", - "hgWOb6+bHxjvkY6ZcNGwtNLQ+bD3kei7uBF78YTt1uDnBksW3JjmDOnRuQ8CDK5SORowKr24mVgUQ8IE", - "3a70+4yi9MV4mkbcKmSr8SqKkx+XlMPv1xMghS/XW8Ee8uLF8a/jSv+HTd+6du/v4R7TyftNnv16fPLd", - "fN9VWxPOX6CrleeKCa/QKJQ73Tg7/fhlt3t9vNncDKP5viChTCIA7SLxHBMCcD5F0QEa2VhlaJZccFzK", - "gwexScwzNp12HP0zwsV5ALOvUxHQ8LGWL1WtYQDT19l+0uD2mPpScnST7xt4yQHj/4OZICl47yVSKIk1", - "kpCCBBRCF1psNwxVdXOz+U8AAAD//61bqX4hGQAA", + "H4sIAAAAAAAC/7xZ3W4buRV+FYJboLvoSHLi9EZXTTZ/DtKssfIiF4khHw2PNLQ55ITkSFEDA/sQfZN2", + "gV50r/oC3jcqDskZjaSR7bSbLBbBeIbk+fvOd86hPvHclJXRqL3j40/c5QWWEB4fOycXGsUZuCv6W6DL", + "ray8NJqPt74y6RgwT0/gmPT0t8Uc5RIFm62ZL5C9NfYK7ZBnvLKmQuslBim5KUvQIjxLj2V4+IPFOR/z", + "b0Yb5UZJs9H3cQO/zrhfV8jHHKyFNf0tBW1Ob523Ui/o9aWZHXo/raw0Vvp1Z4HUHhdomxXxbc92DWX/", + "h9vPdB58faeV5NZJXEmGgrs6rEjt0PZ8uM64xQ+1tCj4+B05J7oibUgGtAp19N5xTccPXVWyTezO21iY", + "2SXmnrR6vASpYKbwlZlN0HvSaQ9FE6kXCpmL35mZM2CvzIzRaa4HLIWReXzcPudtgZot5BJ1xpQspQ+Y", + "W4KSgv6t0TFv6J1Dlg4Zsh+0WrPakY5sJX3BoueCcJLdwnHP47vAEziHWvl9vc4KZOlj1IO5wqx0UoZR", + "INiKdBfo0ZZSB/mFdI1LhnQ8CulJy3h+EjUH5TDb94Mv0NL5oJRZMdq6eyaDuac1BbJLM2MFODZD1MzV", + "s1J6j2LI3ppaCSbLSq2ZQIVxm1IMP0oXDwR35djc2Hj0pZllDLSgvDdlJRWtkX74XvPWXTNjFIImi65w", + "ve+sE4Hay7lEm85tgZGxsnaezZDVWn6oY7ikbk1oIrYXqE0CfIbnZFmikOBRrZlFwjODIEbgXGpJGzKC", + "ajCcRGZBH1P7+KoC62VeK7BtFA+4wdWzJqtvI4OeVJqknS0YP/uEs7R9KZ3cxZa39W0OIgxvIyrF4qeT", + "mMLkrAZNln2r5BUyYE8UaoGWgRADo78bsgl6Ou4iBOQiJkKsHqAZUabVoFoZvgBPomsl9B8DGNpcQi1C", + "Lrl+R+8wIYEvLboncU02cdrhr3o2oC8RDhGMTczZ97W1qL1aM0NMA825Ad0drnFDdvHy8eTls6fT5yev", + "n01PH5+9vIg1VUiLuTd2zSrwBfsTu3jPR9+E/97zCwZVRS4V0WzUdUn2zaXCKa3nGRfSNo/hdeL8AlyB", + "YrpZed6TPIdAs89yyQMd6zsZGwkWHDt5ehrZfB3MJtAkSAzZG8M0Oo+CHFPnvrbo2LeBYF3GhMxJFFiJ", + "7jsGFpmrq8pYv2t6Uj6jgnv8kIxWBjzPAhbuNLLfuqYebWTGnkY69lfQsEAbmU/6kPpQEpX3FC8FM1Sf", + "10kkZ96/Oeorunv1aicdEiSieh2Zd+UGeaunFL+WzjdgCOg+7Ld9HzWNxv9m8dkWIx4wdyOiz8Cmu9wz", + "K31gFiuLjlRgwFxsX1IfFJjoI+a1x7u63ntFfEe5/rDdGq5n1prQGu723CJInxtbgo/taciW/W61ROdg", + "gXe3l+HMzfo+bQiTbfPbZCzkXi5DMwk6R4Ui9pWVQp+edWQEafRgDjKuaB8qqF14+FBjHR7A5gUNHu1j", + "JIR4/IBUDjyTDtl6EZ7jKTXR0qArnGd8BaGZGsyNHRB1ul5G+REX0nm0KGL09/0PQlh0/a3lgRlGgfPT", + "4KvtwaJDIjK/OjySKPAU7X7MmblfgT0AyDZi+5+avJ62Q8F23t7RN/cNKK0VWeul7oTSmJHxPBbXIJrv", + "+qdj0wE1+/A5wbymgedA1tw7FW7LgUnTY7+KQ+l+qdk0mZuGnJjluYISdW6ITPAjUIZErTwI8JD0DXLG", + "fFJZ6ZE9t3JR+DTxDbEEqUjt9cyi/sss1V5jF82KiB8+CQvYxP/n30tUmwrFj2/+zn77+eaXm19v/nnz", + "y28/3/zr5tebf3RHx/Gfj7ZJLEmZ5qXgY/4p/XlNESxqfTV18m/Ix8dkk7eQ+ynUQpqmBaUsC/3JmI9s", + "2Dly89GlmVEBQI0PHh4Pw5GUzg2bnb55QX9Wjo8fPsr4nEqy42P+YPDgiFqDEhbopsZOl1KgIb4Lb3jG", + "Te2r2se2CD961C7GZVgFDokaTOOqbZWikFapDtgdTVE4SIYP4pZ4TdBF14G874Z3D0z3uoFom3qKTM91", + "xOGCc9+7ig2vd6aR21MkpXi6R2jV7cuYzk3IZ5SNtkC0jE6MsCkg9ykHbW2prMnRUbHvJfxI85H2LcRU", + "3iWP/4OdMbfovwYBJ0lbNNsrokPT+xGLKgcinRBAokqr4KMp1JTLu7T3k6NJr0SaeahriovZydOMVeDc", + "yljRfIo6xqseBr5ZajvOJ4IM0AwzIDiZbzqbwvuKX5OOUs9N7IW0h9xvmjLeEC07QyAX1FalnW48Gs0b", + "GpZmFK9+upb8GCfc52BLVsYmlz0+PaECJXPUDjtyXpy+Xh7vnb9arYYLXRMrj9IeN1pUanA8PBqiHha+", + "DIzspVdb2iZxPONLtIm3HgyPhke02lSooZJE4eEVgdsXITIjqGRg1ABa44IrCLrBmSciTrml9K/C3WFK", + "kCdGrBv3oQ57oKqUzMOu0aWLSRAp4i4C2SqKITo9E5hJ5ZB3QUtVIqDYVYY8RZIeHh19Vc1WQCNOnqOb", + "10qtWbz/QsGk9oZJLeRSihpUvDIb7twX/i5qxo6lR7/wgTUNScjNuizBrtuoMmAaV2FamxvbdhnNiNaZ", + "acIFG1ClCEOU4+dbx71q7nwcgY+hFpWR2gd7W4yNWo5aYA/QXqBvB8svGNX9KbbHde2izSS748AX6Jna", + "m3bDIFigtDuXAbe4biOqdf/l5hK88V+kupFN48VgtZkuerO2mUPSFPJlUren9vX4Mq4iLDXaf9Us3pvI", + "elTU1D0q1ujwVbO01vixwpwafUxruvBo1E+pumri2SAqvTjv2RRDQgDd7HS7iPLpB74D5J8XKGqFZ3HQ", + "+nJJ2f25scdJ4YfGLhtdZ/zh0aP9buKNST9ObF+4houn5j7mOuOPjo5/vzKxNTn2KH+KtiHGp6gliq0+", + "iY/f7XRI786vz7vR/GHmQeoEAL/tibuQEBznUhRtl5iDCpbahaBBbERGnESnE3d9+4T8Sf8Hl4Yrd9Jk", + "gT4wVjOtsBmomQLeDo6B9K6zvR+RT0+2q06MTzgzN2VZa4pH+nVutzQNN8cnu6/Pr/8bAAD//4nKLbvQ", + "HgAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/pkg/api/openapi_types.gen.go b/pkg/api/openapi_types.gen.go index 0226dd4f..35e30607 100644 --- a/pkg/api/openapi_types.gen.go +++ b/pkg/api/openapi_types.gen.go @@ -29,6 +29,37 @@ const ( AvailableJobSettingTypeString AvailableJobSettingType = "string" ) +// Defines values for JobStatus. +const ( + JobStatusActive JobStatus = "active" + + JobStatusArchived JobStatus = "archived" + + JobStatusArchiving JobStatus = "archiving" + + JobStatusCancelRequested JobStatus = "cancel-requested" + + JobStatusCanceled JobStatus = "canceled" + + JobStatusCompleted JobStatus = "completed" + + JobStatusConstructionFailed JobStatus = "construction-failed" + + JobStatusFailRequested JobStatus = "fail-requested" + + JobStatusFailed JobStatus = "failed" + + JobStatusPaused JobStatus = "paused" + + JobStatusQueued JobStatus = "queued" + + JobStatusRequeued JobStatus = "requeued" + + JobStatusUnderConstruction JobStatus = "under-construction" + + JobStatusWaitingForFiles JobStatus = "waiting-for-files" +) + // Defines values for TaskStatus. const ( TaskStatusActive TaskStatus = "active" @@ -54,8 +85,8 @@ const ( // AssignedTask is a task as it is received by the Worker. type AssignedTask struct { - Id string `json:"_id"` Commands []Command `json:"commands"` + Id string `json:"id"` Job string `json:"job"` JobPriority int `json:"job_priority"` JobType string `json:"job_type"` @@ -123,10 +154,13 @@ type Error struct { Message string `json:"message"` } +// JobStatus defines model for JobStatus. +type JobStatus string + // RegisteredWorker defines model for RegisteredWorker. type RegisteredWorker struct { - Id string `json:"_id"` Address string `json:"address"` + Id string `json:"id"` LastActivity string `json:"last_activity"` Nickname string `json:"nickname"` Platform string `json:"platform"` @@ -140,6 +174,17 @@ type SecurityError struct { Message string `json:"message"` } +// Job definition submitted to Flamenco. +type SubmittedJob struct { + Id *string `json:"id,omitempty"` + Metadata *map[string]interface{} `json:"metadata,omitempty"` + Name string `json:"name"` + Priority int `json:"priority"` + Settings *map[string]interface{} `json:"settings,omitempty"` + Status *JobStatus `json:"status,omitempty"` + Type string `json:"type"` +} + // TaskStatus defines model for TaskStatus. type TaskStatus string @@ -151,8 +196,14 @@ type WorkerRegistration struct { SupportedTaskTypes []string `json:"supported_task_types"` } +// SubmitJobJSONBody defines parameters for SubmitJob. +type SubmitJobJSONBody SubmittedJob + // RegisterWorkerJSONBody defines parameters for RegisterWorker. type RegisterWorkerJSONBody WorkerRegistration +// SubmitJobJSONRequestBody defines body for SubmitJob for application/json ContentType. +type SubmitJobJSONRequestBody SubmitJobJSONBody + // RegisterWorkerJSONRequestBody defines body for RegisterWorker for application/json ContentType. type RegisterWorkerJSONRequestBody RegisterWorkerJSONBody