diff --git a/cmd/flamenco-worker-poc/cliargs.go b/cmd/flamenco-worker-poc/cliargs.go new file mode 100644 index 00000000..486f4c4d --- /dev/null +++ b/cmd/flamenco-worker-poc/cliargs.go @@ -0,0 +1,60 @@ +package main + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "errors" + "flag" + "net/url" + + "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-ng-poc/internal/worker" +) + +var errURLWithoutHostName = errors.New("manager URL should contain a host name") + +var cliArgs struct { + version bool + verbose bool + debug bool + managerURL *url.URL + manager string + register bool +} + +func parseCliArgs() { + flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.") + flag.BoolVar(&cliArgs.verbose, "verbose", false, "Enable info-level logging.") + flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug-level logging.") + + flag.StringVar(&cliArgs.manager, "manager", "", "URL of the Flamenco Manager.") + flag.BoolVar(&cliArgs.register, "register", false, "(Re-)register at the Manager.") + + flag.Parse() + + if cliArgs.manager != "" { + var err error + cliArgs.managerURL, err = worker.ParseURL(cliArgs.manager) + if err != nil { + log.Fatal().Err(err).Msg("invalid manager URL") + } + } +} diff --git a/cmd/flamenco-worker-poc/main.go b/cmd/flamenco-worker-poc/main.go index 167df5f5..33bb7af5 100644 --- a/cmd/flamenco-worker-poc/main.go +++ b/cmd/flamenco-worker-poc/main.go @@ -22,76 +22,59 @@ package main import ( "context" + "fmt" "net/http" - "os" - "runtime" + "net/url" "time" - "github.com/deepmap/oapi-codegen/pkg/securityprovider" "github.com/mattn/go-colorable" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "gitlab.com/blender/flamenco-ng-poc/internal/appinfo" + "gitlab.com/blender/flamenco-ng-poc/internal/worker" + "gitlab.com/blender/flamenco-ng-poc/internal/worker/ssdp" "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) func main() { + parseCliArgs() + if cliArgs.version { + fmt.Println(appinfo.ApplicationVersion) + return + } + output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} log.Logger = log.Output(output) log.Info().Str("version", appinfo.ApplicationVersion).Msgf("starting %v Worker", appinfo.ApplicationName) - basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth("MY_USER", "MY_PASS") - if err != nil { - log.Panic().Err(err).Msg("unable to create basic authr") - } + // configWrangler := worker.NewConfigWrangler() + managerFinder := ssdp.NewManagerFinder(cliArgs.managerURL) + // taskRunner := struct{}{} + findManager(managerFinder) - flamenco, err := api.NewClientWithResponses( - "http://localhost:8080/", - api.WithRequestEditorFn(basicAuthProvider.Intercept), - api.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error { - req.Header.Set("User-Agent", appinfo.UserAgent()) - return nil - }), - ) - if err != nil { - log.Fatal().Err(err).Msg("error creating client") - } + // basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth("MY_USER", "MY_PASS") + // if err != nil { + // log.Panic().Err(err).Msg("unable to create basic authr") + // } - ctx := context.Background() - registerWorker(ctx, flamenco) - obtainTask(ctx, flamenco) -} + // flamenco, err := api.NewClientWithResponses( + // "http://localhost:8080/", + // api.WithRequestEditorFn(basicAuthProvider.Intercept), + // api.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error { + // req.Header.Set("User-Agent", appinfo.UserAgent()) + // return nil + // }), + // ) + // if err != nil { + // log.Fatal().Err(err).Msg("error creating client") + // } -func registerWorker(ctx context.Context, flamenco *api.ClientWithResponses) { - hostname, err := os.Hostname() - if err != nil { - log.Fatal().Err(err).Msg("error getting hostname") - } - - req := api.RegisterWorkerJSONRequestBody{ - Nickname: hostname, - Platform: runtime.GOOS, - Secret: "secret", - SupportedTaskTypes: []string{"sleep", "blender-render", "ffmpeg", "file-management"}, - } - resp, err := flamenco.RegisterWorkerWithResponse(ctx, req) - if err != nil { - log.Fatal().Err(err).Msg("error registering at Manager") - } - switch { - case resp.JSON200 != nil: - log.Info(). - Int("code", resp.StatusCode()). - Interface("resp", resp.JSON200). - Msg("registered at Manager") - default: - log.Fatal(). - Int("code", resp.StatusCode()). - Interface("resp", resp.JSONDefault). - Msg("unable to register at Manager") - } + // w := worker.NewWorker(flamenco, configWrangler, managerFinder, taskRunner) + // ctx := context.Background() + // registerWorker(ctx, flamenco) + // obtainTask(ctx, flamenco) } func obtainTask(ctx context.Context, flamenco *api.ClientWithResponses) { @@ -118,3 +101,16 @@ func obtainTask(ctx context.Context, flamenco *api.ClientWithResponses) { Msg("unable to obtain task") } } + +func findManager(managerFinder worker.ManagerFinder) *url.URL { + finder := managerFinder.FindFlamencoManager() + select { + case manager := <-finder: + log.Info().Str("manager", manager.String()).Msg("found Manager") + return manager + case <-time.After(10 * time.Second): + log.Fatal().Msg("unable to autodetect Flamenco Manager via UPnP/SSDP; configure the URL explicitly") + } + + return nil +} diff --git a/go.mod b/go.mod index c4016387..40bcb70d 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,9 @@ require ( github.com/shopspring/decimal v1.2.0 // indirect github.com/stretchr/testify v1.7.0 github.com/ziflex/lecho/v3 v3.1.0 + gitlab.com/blender-institute/gossdp v0.0.0-20181214124559-074ccf115d76 golang.org/x/net v0.0.0-20211013171255-e13a2654a71e + gopkg.in/yaml.v2 v2.4.0 gorm.io/driver/postgres v1.0.8 gorm.io/gorm v1.21.4 ) diff --git a/go.sum b/go.sum index d0949987..782dd666 100644 --- a/go.sum +++ b/go.sum @@ -227,6 +227,8 @@ github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= github.com/ziflex/lecho/v3 v3.1.0 h1:65bSzSc0yw7EEhi44lMnkOI877ZzbE7tGDWfYCQXZwI= github.com/ziflex/lecho/v3 v3.1.0/go.mod h1:dwQ6xCAKmSBHhwZ6XmiAiDptD7iklVkW7xQYGUncX0Q= +gitlab.com/blender-institute/gossdp v0.0.0-20181214124559-074ccf115d76 h1:ASbeHgntCaY+Q/qRUX1y6T12WncACelKVRUFGjyIOVM= +gitlab.com/blender-institute/gossdp v0.0.0-20181214124559-074ccf115d76/go.mod h1:+j3oHEe07Rw8lFbVhESVy83XVW51AndFrjbUMb2JI4k= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index a8603576..bb7a445e 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -35,10 +35,7 @@ import ( // RegisterWorker registers a new worker and stores it in the database. func (f *Flamenco) RegisterWorker(e echo.Context) error { remoteIP := e.RealIP() - - logger := log.With(). - Str("ip", remoteIP). - Logger() + logger := log.With().Str("ip", remoteIP).Logger() var req api.RegisterWorkerJSONBody err := e.Bind(&req) @@ -47,11 +44,14 @@ func (f *Flamenco) RegisterWorker(e echo.Context) error { return sendAPIError(e, http.StatusBadRequest, "invalid format") } + // TODO: validate the request, should at least have non-empty name, secret, and platform. + logger.Info().Str("nickname", req.Nickname).Msg("registering new worker") dbWorker := persistence.Worker{ UUID: uuid.New().String(), Name: req.Nickname, + Secret: req.Secret, Platform: req.Platform, Address: remoteIP, SupportedTaskTypes: strings.Join(req.SupportedTaskTypes, ","), @@ -73,6 +73,25 @@ func (f *Flamenco) RegisterWorker(e echo.Context) error { }) } +func (f *Flamenco) SignOn(e echo.Context) error { + remoteIP := e.RealIP() + logger := log.With().Str("ip", remoteIP).Logger() + + var req api.SignOnJSONBody + err := e.Bind(&req) + if err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + + logger.Info().Str("nickname", req.Nickname).Msg("worker signing on") + + return e.JSON(http.StatusOK, &api.WorkerStateChange{ + // TODO: look up proper status in DB. + StatusRequested: api.WorkerStatusAwake, + }) +} + func (f *Flamenco) ScheduleTask(e echo.Context) error { return e.JSON(http.StatusOK, &api.AssignedTask{ Uuid: uuid.New().String(), diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 81029dd1..cb7d1361 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -30,8 +30,9 @@ import ( type Worker struct { gorm.Model - UUID string `gorm:"type:char(36);not null;unique;index"` - Name string `gorm:"type:varchar(64);not null"` + UUID string `gorm:"type:char(36);not null;unique;index"` + Secret string `gorm:"type:varchar(255);not null"` + Name string `gorm:"type:varchar(64);not null"` Address string `gorm:"type:varchar(39);not null;index"` // 39 = max length of IPv6 address. LastActivity string `gorm:"type:varchar(255);not null"` diff --git a/internal/worker/config.go b/internal/worker/config.go new file mode 100644 index 00000000..2e77212e --- /dev/null +++ b/internal/worker/config.go @@ -0,0 +1,167 @@ +package worker + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "errors" + "fmt" + "io" + "net/url" + "os" + "time" + + "github.com/rs/zerolog/log" + yaml "gopkg.in/yaml.v2" +) + +var ( + errURLWithoutHostName = errors.New("manager URL should contain a host name") +) + +// WorkerConfig represents the configuration of a single worker. +// It does not include authentication credentials. +type WorkerConfig struct { + Manager string `yaml:"manager_url"` + TaskTypes []string `yaml:"task_types"` +} + +type workerCredentials struct { + WorkerID string `yaml:"worker_id"` + Secret string `yaml:"worker_secret"` +} + +// ConfigWrangler makes it simple to load and write configuration files. +type ConfigWrangler interface { + DefaultConfig() WorkerConfig + WriteConfig(filename string, filetype string, config interface{}) error + LoadConfig(filename string, config interface{}) error +} + +// FileConfigWrangler is the default config wrangler that actually reads & writes files. +type FileConfigWrangler struct{} + +// NewConfigWrangler returns a new ConfigWrangler instance of the default type FileConfigWrangler. +func NewConfigWrangler() ConfigWrangler { + return FileConfigWrangler{} +} + +// DefaultConfig returns a fairly sane default configuration. +func (fcw FileConfigWrangler) DefaultConfig() WorkerConfig { + return WorkerConfig{ + Manager: "", + TaskTypes: []string{"sleep", "blender-render", "file-management", "exr-merge", "debug"}, + } +} + +// WriteConfig stores a struct as YAML file. +func (fcw FileConfigWrangler) WriteConfig(filename string, filetype string, config interface{}) error { + data, err := yaml.Marshal(config) + if err != nil { + return err + } + + tempFilename := filename + "~" + f, err := os.OpenFile(tempFilename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + fmt.Fprintf(f, "# %s file for Flamenco Worker.\n", filetype) + fmt.Fprintln(f, "# For an explanation of the fields, refer to flamenco-worker-example.yaml") + fmt.Fprintln(f, "#") + fmt.Fprintln(f, "# NOTE: this file can be overwritten by Flamenco Worker.") + fmt.Fprintln(f, "#") + now := time.Now() + fmt.Fprintf(f, "# This file was written on %s\n\n", now.Format("2006-01-02 15:04:05 -07:00")) + + n, err := f.Write(data) + if err != nil { + f.Close() // ignore errors here + return err + } + if n < len(data) { + f.Close() // ignore errors here + return io.ErrShortWrite + } + if err = f.Close(); err != nil { + return err + } + + log.Debug().Str("filename", tempFilename).Msg("config file written") + log.Debug(). + Str("from", tempFilename). + Str("to", filename). + Msg("renaming config file") + if err := os.Rename(tempFilename, filename); err != nil { + return err + } + log.Info().Str("filename", filename).Msg("Saved configuration file") + + return nil +} + +// LoadConfig loads a YAML configuration file into 'config' +func (fcw FileConfigWrangler) LoadConfig(filename string, config interface{}) error { + log.Debug().Str("filename", filename).Msg("loading config file") + f, err := os.OpenFile(filename, os.O_RDONLY, 0) + if err != nil { + return err + } + defer f.Close() + + dec := yaml.NewDecoder(f) + if err = dec.Decode(config); err != nil { + return err + } + + return nil +} + +// ParseURL allows URLs without scheme (assumes HTTP). +func ParseURL(rawURL string) (*url.URL, error) { + var err error + var parsedURL *url.URL + + parsedURL, err = url.Parse(rawURL) + if err != nil { + return nil, err + } + + // url.Parse() is a bit weird when there is no scheme. + if parsedURL.Host == "" && parsedURL.Path != "" { + // This case happens when you just enter a hostname, like manager='thehost' + parsedURL.Host = parsedURL.Path + parsedURL.Path = "/" + } + if parsedURL.Host == "" && parsedURL.Scheme != "" && parsedURL.Opaque != "" { + // This case happens when you just enter a hostname:port, like manager='thehost:8083' + parsedURL.Host = parsedURL.Scheme + ":" + parsedURL.Opaque + parsedURL.Opaque = "" + parsedURL.Scheme = "http" + } + if parsedURL.Scheme == "" { + parsedURL.Scheme = "http" + } + if parsedURL.Host == "" { + return nil, errURLWithoutHostName + } + + return parsedURL, nil +} diff --git a/internal/worker/config_test.go b/internal/worker/config_test.go new file mode 100644 index 00000000..5724cbb8 --- /dev/null +++ b/internal/worker/config_test.go @@ -0,0 +1,39 @@ +package worker + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseURL(t *testing.T) { + test := func(expected, input string) { + actualURL, err := ParseURL(input) + assert.Nil(t, err) + assert.Equal(t, expected, actualURL.String()) + } + + test("http://jemoeder:1234", "jemoeder:1234") + test("http://jemoeder/", "jemoeder") + test("opjehoofd://jemoeder:4213/xxx", "opjehoofd://jemoeder:4213/xxx") +} diff --git a/internal/worker/registration.go b/internal/worker/registration.go new file mode 100644 index 00000000..07c21eee --- /dev/null +++ b/internal/worker/registration.go @@ -0,0 +1,131 @@ +package worker + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "context" + "crypto/rand" + "encoding/hex" + "os" + "runtime" + + "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-ng-poc/pkg/api" +) + +// (Re-)register ourselves at the Manager. +func (w *Worker) register(ctx context.Context) { + // Construct our new password. + secret := make([]byte, 32) + if _, err := rand.Read(secret); err != nil { + log.Fatal().Err(err).Msg("unable to generate secret key") + } + secretKey := hex.EncodeToString(secret) + + // TODO: load taskTypes from config file. + taskTypes := []string{"unknown", "sleep", "blender-render", "debug", "ffmpeg"} + + req := api.RegisterWorkerJSONRequestBody{ + Nickname: mustHostname(), + Platform: runtime.GOOS, + Secret: secretKey, + SupportedTaskTypes: taskTypes, + } + resp, err := w.client.RegisterWorkerWithResponse(ctx, req) + if err != nil { + log.Fatal().Err(err).Msg("error registering at Manager") + } + switch { + case resp.JSON200 != nil: + log.Info(). + Int("code", resp.StatusCode()). + Interface("resp", resp.JSON200). + Msg("registered at Manager") + default: + log.Fatal(). + Int("code", resp.StatusCode()). + Interface("resp", resp.JSONDefault). + Msg("unable to register at Manager") + } + + // store ID and secretKey in config file when registration is complete. + err = w.configWrangler.WriteConfig(credentialsFilename, "Credentials", workerCredentials{ + WorkerID: resp.JSON200.Uuid, + Secret: secretKey, + }) + if err != nil { + log.Fatal().Err(err).Str("file", credentialsFilename). + Msg("unable to write credentials configuration file") + } +} + +func (w *Worker) reregister(ctx context.Context) { + w.register(ctx) + w.loadConfig() +} + +// signOn tells the Manager we're alive and returns the status the Manager tells us to go to. +// Failure to sign on is fatal. +func (w *Worker) signOn(ctx context.Context) api.WorkerStatus { + logger := log.With().Str("manager", w.manager.String()).Logger() + logger.Info().Msg("signing on at Manager") + + if w.creds == nil { + logger.Fatal().Msg("no credentials, unable to sign on") + } + + // TODO: load taskTypes from config file. + taskTypes := []string{"unknown", "sleep", "blender-render", "debug", "ffmpeg"} + + req := api.SignOnJSONRequestBody{ + Nickname: mustHostname(), + SupportedTaskTypes: taskTypes, + } + resp, err := w.client.SignOnWithResponse(ctx, req) + if err != nil { + log.Fatal().Err(err).Msg("error registering at Manager") + } + switch { + case resp.JSON200 != nil: + log.Info(). + Int("code", resp.StatusCode()). + Interface("resp", resp.JSON200). + Msg("signed on at Manager") + default: + log.Fatal(). + Int("code", resp.StatusCode()). + Interface("resp", resp.JSONDefault). + Msg("unable to sign on at Manager") + } + + startupState := resp.JSON200.StatusRequested + log.Info().Str("startup_state", string(startupState)).Msg("manager accepted sign-on") + return startupState +} + +// mustHostname either the hostname or logs a fatal error. +func mustHostname() string { + hostname, err := os.Hostname() + if err != nil { + log.Fatal().Err(err).Msg("error getting hostname") + } + return hostname +} diff --git a/internal/worker/ssdp/client.go b/internal/worker/ssdp/client.go new file mode 100644 index 00000000..ed1b3ebc --- /dev/null +++ b/internal/worker/ssdp/client.go @@ -0,0 +1,104 @@ +package ssdp + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "net/url" + + "github.com/rs/zerolog/log" + "gitlab.com/blender-institute/gossdp" +) + +// Finder is a uses UPnP/SSDP to find a Flamenco Manager on the local network. +type Finder struct { + overrideURL *url.URL +} + +type ssdpClient struct { + response chan interface{} +} + +// NewManagerFinder returns a default SSDP/UPnP based finder. +func NewManagerFinder(managerURL *url.URL) Finder { + return Finder{ + overrideURL: managerURL, + } +} + +func (b *ssdpClient) NotifyAlive(message gossdp.AliveMessage) { + log.Info().Interface("message", message).Msg("UPnP/SSDP NotifyAlive") +} +func (b *ssdpClient) NotifyBye(message gossdp.ByeMessage) { + log.Info().Interface("message", message).Msg("UPnP/SSDP NotifyBye") +} +func (b *ssdpClient) Response(message gossdp.ResponseMessage) { + log.Debug().Interface("message", message).Msg("UPnP/SSDP response") + url, err := url.Parse(message.Location) + if err != nil { + b.response <- err + return + } + b.response <- url +} + +// FindFlamencoManager tries to find a Manager, sending its URL to the returned channel. +func (f Finder) FindFlamencoManager() <-chan *url.URL { + reporter := make(chan *url.URL) + + go func() { + defer close(reporter) + + if f.overrideURL != nil { + log.Debug().Str("url", f.overrideURL.String()).Msg("Using configured Flamenco Manager URL") + reporter <- f.overrideURL + return + } + + log.Info().Msg("finding Flamenco Manager via UPnP/SSDP") + b := ssdpClient{make(chan interface{})} + + client, err := gossdp.NewSsdpClientWithLogger(&b, ZeroLogWrapper{}) + if err != nil { + log.Fatal().Err(err).Msg("Unable to create UPnP/SSDP client") + return + } + + log.Debug().Msg("Starting UPnP/SSDP client") + go client.Start() + defer client.Stop() + + if err := client.ListenFor("urn:flamenco:manager:0"); err != nil { + log.Error().Err(err).Msg("unable to find Manager") + return + } + + log.Debug().Msg("Waiting for UPnP/SSDP answer") + urlOrErr := <-b.response + switch v := urlOrErr.(type) { + case *url.URL: + reporter <- v + case error: + log.Fatal().Err(v).Msg("Error waiting for UPnP/SSDP response from Manager") + } + }() + + return reporter +} diff --git a/internal/worker/ssdp/zerolog.go b/internal/worker/ssdp/zerolog.go new file mode 100644 index 00000000..8ea9b865 --- /dev/null +++ b/internal/worker/ssdp/zerolog.go @@ -0,0 +1,44 @@ +package ssdp + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ +import ( + "fmt" + + "github.com/rs/zerolog/log" + "gitlab.com/blender-institute/gossdp" +) + +var _ gossdp.LoggerInterface = ZeroLogWrapper{} + +type ZeroLogWrapper struct{} + +func (l ZeroLogWrapper) Debugf(msg string, args ...interface{}) { + log.Debug().Msg(fmt.Sprintf(msg, args...)) +} +func (l ZeroLogWrapper) Infof(msg string, args ...interface{}) { + log.Info().Msg(fmt.Sprintf(msg, args...)) +} +func (l ZeroLogWrapper) Warnf(msg string, args ...interface{}) { + log.Warn().Msg(fmt.Sprintf(msg, args...)) +} +func (l ZeroLogWrapper) Errorf(msg string, args ...interface{}) { + log.Error().Msg(fmt.Sprintf(msg, args...)) +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go new file mode 100644 index 00000000..7c649ccd --- /dev/null +++ b/internal/worker/worker.go @@ -0,0 +1,145 @@ +package worker + +import ( + "context" + "errors" + "net/url" + "os" + "sync" + "time" + + "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-ng-poc/pkg/api" +) + +const ( + requestRetry = 5 * time.Second + credentialsFilename = "flamenco-worker-credentials.yaml" + configFilename = "flamenco-worker.yaml" +) + +var ( + errRequestAborted = errors.New("request to Manager aborted") +) + +// Worker performs regular Flamenco Worker operations. +type Worker struct { + doneChan chan struct{} + doneWg *sync.WaitGroup + + manager *url.URL + client api.ClientWithResponsesInterface + creds *workerCredentials + + state api.WorkerStatus + stateStarters map[string]func() // gotoStateXXX functions + stateMutex *sync.Mutex + + taskRunner TaskRunner + + configWrangler ConfigWrangler + config WorkerConfig + managerFinder ManagerFinder +} + +type ManagerFinder interface { + FindFlamencoManager() <-chan *url.URL +} + +type TaskRunner interface{} + +// NewWorker constructs and returns a new Worker. +func NewWorker( + flamenco api.ClientWithResponsesInterface, + configWrangler ConfigWrangler, + managerFinder ManagerFinder, + taskRunner TaskRunner, +) *Worker { + + worker := &Worker{ + doneChan: make(chan struct{}), + doneWg: new(sync.WaitGroup), + + client: flamenco, + + state: api.WorkerStatusStarting, + stateStarters: make(map[string]func()), + stateMutex: new(sync.Mutex), + + // taskRunner: taskRunner, + + configWrangler: configWrangler, + managerFinder: managerFinder, + } + // worker.setupStateMachine() + worker.loadConfig() + return worker +} + +func (w *Worker) start(ctx context.Context, register bool) { + w.doneWg.Add(1) + defer w.doneWg.Done() + + w.loadCredentials() + + if w.creds == nil || register { + w.register(ctx) + } + + startState := w.signOn(ctx) + log.Error().Str("state", string(startState)).Msg("here the road ends, nothing else is implemented") + // w.changeState(startState) +} + +func (w *Worker) loadCredentials() { + log.Debug().Msg("loading credentials") + + w.creds = &workerCredentials{} + err := w.configWrangler.LoadConfig(credentialsFilename, w.creds) + if err != nil { + log.Warn().Err(err).Str("file", credentialsFilename). + Msg("unable to load credentials configuration file") + w.creds = nil + return + } +} + +func (w *Worker) loadConfig() { + logger := log.With().Str("filename", configFilename).Logger() + err := w.configWrangler.LoadConfig(configFilename, &w.config) + if os.IsNotExist(err) { + logger.Info().Msg("writing default configuration file") + w.config = w.configWrangler.DefaultConfig() + w.saveConfig() + err = w.configWrangler.LoadConfig(configFilename, &w.config) + } + if err != nil { + logger.Fatal().Err(err).Msg("unable to load config file") + } + + if w.config.Manager != "" { + w.manager, err = ParseURL(w.config.Manager) + if err != nil { + logger.Fatal().Err(err).Str("url", w.config.Manager). + Msg("unable to parse manager URL") + } + logger.Debug().Str("url", w.config.Manager).Msg("parsed manager URL") + } + +} + +func (w *Worker) saveConfig() { + err := w.configWrangler.WriteConfig(configFilename, "Configuration", w.config) + if err != nil { + log.Warn().Err(err).Str("filename", configFilename). + Msg("unable to write configuration file") + } +} + +// Close gracefully shuts down the Worker. +func (w *Worker) Close() { + log.Debug().Msg("worker gracefully shutting down") + close(w.doneChan) + w.doneWg.Wait() + log.Debug().Msg("worker shut down") +} diff --git a/pkg/api/flamenco-manager.yaml b/pkg/api/flamenco-manager.yaml index 4819357b..ee61ca9d 100644 --- a/pkg/api/flamenco-manager.yaml +++ b/pkg/api/flamenco-manager.yaml @@ -39,6 +39,34 @@ paths: schema: $ref: '#/components/schemas/Error' + /api/worker/sign-on: + summary: Called by Workers to let the Manager know they're ready to work, and to update their metadata. + post: + summary: Authenticate & sign in the worker. + operationId: signOn + security: [{worker_auth: []}] + tags: [worker] + requestBody: + description: Worker metadata + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/WorkerSignOn" + responses: + "200": + description: normal response + content: + application/json: + schema: + $ref: "#/components/schemas/WorkerStateChange" + default: + description: unexpected error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + /api/worker/task: summary: Task scheduler endpoint. post: @@ -154,6 +182,21 @@ components: type: string enum: [starting, awake, asleep, error, shutting-down, testing] + WorkerSignOn: + type: object + properties: + nickname: {type: string} + supported_task_types: + type: array + items: {type: string} + required: [nickname, supported_task_types] + + WorkerStateChange: + type: object + properties: + status_requested: {$ref: "#/components/schemas/WorkerStatus"} + required: [status_requested] + AssignedTask: type: object description: AssignedTask is a task as it is received by the Worker. diff --git a/pkg/api/openapi_client.gen.go b/pkg/api/openapi_client.gen.go index 3fbf3ebf..c458d94d 100644 --- a/pkg/api/openapi_client.gen.go +++ b/pkg/api/openapi_client.gen.go @@ -106,6 +106,11 @@ type ClientInterface interface { RegisterWorker(ctx context.Context, body RegisterWorkerJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) + // SignOn request with any body + SignOnWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) + + SignOn(ctx context.Context, body SignOnJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) + // ScheduleTask request ScheduleTask(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) } @@ -182,6 +187,30 @@ func (c *Client) RegisterWorker(ctx context.Context, body RegisterWorkerJSONRequ return c.Client.Do(req) } +func (c *Client) SignOnWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewSignOnRequestWithBody(c.Server, contentType, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) SignOn(ctx context.Context, body SignOnJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewSignOnRequest(c.Server, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + func (c *Client) ScheduleTask(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) { req, err := NewScheduleTaskRequest(c.Server) if err != nil { @@ -335,6 +364,46 @@ func NewRegisterWorkerRequestWithBody(server string, contentType string, body io return req, nil } +// NewSignOnRequest calls the generic SignOn builder with application/json body +func NewSignOnRequest(server string, body SignOnJSONRequestBody) (*http.Request, error) { + var bodyReader io.Reader + buf, err := json.Marshal(body) + if err != nil { + return nil, err + } + bodyReader = bytes.NewReader(buf) + return NewSignOnRequestWithBody(server, "application/json", bodyReader) +} + +// NewSignOnRequestWithBody generates requests for SignOn with any type of body +func NewSignOnRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) { + var err error + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/api/worker/sign-on") + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", queryURL.String(), body) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Type", contentType) + + return req, nil +} + // NewScheduleTaskRequest generates requests for ScheduleTask func NewScheduleTaskRequest(server string) (*http.Request, error) { var err error @@ -421,6 +490,11 @@ type ClientWithResponsesInterface interface { RegisterWorkerWithResponse(ctx context.Context, body RegisterWorkerJSONRequestBody, reqEditors ...RequestEditorFn) (*RegisterWorkerResponse, error) + // SignOn request with any body + SignOnWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*SignOnResponse, error) + + SignOnWithResponse(ctx context.Context, body SignOnJSONRequestBody, reqEditors ...RequestEditorFn) (*SignOnResponse, error) + // ScheduleTask request ScheduleTaskWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*ScheduleTaskResponse, error) } @@ -515,6 +589,29 @@ func (r RegisterWorkerResponse) StatusCode() int { return 0 } +type SignOnResponse struct { + Body []byte + HTTPResponse *http.Response + JSON200 *WorkerStateChange + JSONDefault *Error +} + +// Status returns HTTPResponse.Status +func (r SignOnResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r SignOnResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + type ScheduleTaskResponse struct { Body []byte HTTPResponse *http.Response @@ -590,6 +687,23 @@ func (c *ClientWithResponses) RegisterWorkerWithResponse(ctx context.Context, bo return ParseRegisterWorkerResponse(rsp) } +// SignOnWithBodyWithResponse request with arbitrary body returning *SignOnResponse +func (c *ClientWithResponses) SignOnWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*SignOnResponse, error) { + rsp, err := c.SignOnWithBody(ctx, contentType, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseSignOnResponse(rsp) +} + +func (c *ClientWithResponses) SignOnWithResponse(ctx context.Context, body SignOnJSONRequestBody, reqEditors ...RequestEditorFn) (*SignOnResponse, error) { + rsp, err := c.SignOn(ctx, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseSignOnResponse(rsp) +} + // ScheduleTaskWithResponse request returning *ScheduleTaskResponse func (c *ClientWithResponses) ScheduleTaskWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*ScheduleTaskResponse, error) { rsp, err := c.ScheduleTask(ctx, reqEditors...) @@ -717,6 +831,39 @@ func ParseRegisterWorkerResponse(rsp *http.Response) (*RegisterWorkerResponse, e return response, nil } +// ParseSignOnResponse parses an HTTP response from a SignOnWithResponse call +func ParseSignOnResponse(rsp *http.Response) (*SignOnResponse, error) { + bodyBytes, err := ioutil.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &SignOnResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200: + var dest WorkerStateChange + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON200 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && true: + var dest Error + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSONDefault = &dest + + } + + return response, nil +} + // ParseScheduleTaskResponse parses an HTTP response from a ScheduleTaskWithResponse call func ParseScheduleTaskResponse(rsp *http.Response) (*ScheduleTaskResponse, error) { bodyBytes, err := ioutil.ReadAll(rsp.Body) diff --git a/pkg/api/openapi_server.gen.go b/pkg/api/openapi_server.gen.go index 02c123e4..d8606dd4 100644 --- a/pkg/api/openapi_server.gen.go +++ b/pkg/api/openapi_server.gen.go @@ -25,6 +25,9 @@ type ServerInterface interface { // Register a new worker // (POST /api/worker/register-worker) RegisterWorker(ctx echo.Context) error + // Authenticate & sign in the worker. + // (POST /api/worker/sign-on) + SignOn(ctx echo.Context) error // Obtain a new task to execute // (POST /api/worker/task) ScheduleTask(ctx echo.Context) error @@ -78,6 +81,17 @@ func (w *ServerInterfaceWrapper) RegisterWorker(ctx echo.Context) error { return err } +// SignOn converts echo context to params. +func (w *ServerInterfaceWrapper) SignOn(ctx echo.Context) error { + var err error + + ctx.Set(Worker_authScopes, []string{""}) + + // Invoke the callback with all the unmarshalled arguments + err = w.Handler.SignOn(ctx) + return err +} + // ScheduleTask converts echo context to params. func (w *ServerInterfaceWrapper) ScheduleTask(ctx echo.Context) error { var err error @@ -121,6 +135,7 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL router.GET(baseURL+"/api/jobs/types", wrapper.GetJobTypes) router.GET(baseURL+"/api/jobs/:job_id", wrapper.FetchJob) router.POST(baseURL+"/api/worker/register-worker", wrapper.RegisterWorker) + router.POST(baseURL+"/api/worker/sign-on", wrapper.SignOn) router.POST(baseURL+"/api/worker/task", wrapper.ScheduleTask) } diff --git a/pkg/api/openapi_spec.gen.go b/pkg/api/openapi_spec.gen.go index d4045b30..d1e47dc7 100644 --- a/pkg/api/openapi_spec.gen.go +++ b/pkg/api/openapi_spec.gen.go @@ -18,50 +18,53 @@ import ( // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/7xa3W4jtxV+FWJSoAk6krzr9EZX3WSzGy/yY0QOcpEY8pnhkYY2h5yQHMvqwkAeom/S", - "BuhFc9UXcN6oOCTnV+OfbbJZLIzRkDw8v9/5kd4muS4rrVA5myzfJjYvsAT/+MJasVXIz8Be0WeONjei", - "ckKrZDlYZcIyYI6ewDLh6LPBHMU1cpbtmSuQfafNFZp5kiaV0RUaJ9DfkuuyBMX9s3BY+oc/Gdwky+SD", - "RcfcInK2+DQcSG7TxO0rTJYJGAN7+nypMzodX1tnhNrG9+vKCG2E2/c2COVwi6bZEd5OHFdQTi88TNM6", - "cPWj4pD+VmEnSQT26n5GaotmeqEWnBY22pTgkmV4kY433qaJwR9rYZAny++bTaS1SDvK2vLeE3GkxZ7K", - "+lynnT3P2+t1dom5Iz5fXIOQkEl8o7MVOkdcHXjWSqitRGbDOtMbBuyNzhhRsxMOVGiRh8chne8KVGwr", - "rlGlTIpSOO+H1yAFp781WuY0vbPIIpE5+1rJPast8ch2whUs6M5fTne3Lnpgg7EzctxALd0hX2cFsrgY", - "+GC20DsVmWFkCLYj3jk6NKVQ/v5C2EYlcyKPXDjiMtCPV21AWkwP9eAKNEQfpNQ7RkfHNBlsHO0pkF3q", - "jBVgWYaomK2zUjiHfM6+07XkTJSV3DOOEsMxKRneCBsIgr2ybKNNIH2ps5SB4oQFuqyEpD3CzX9QnWtm", - "WksERRJd4f5QWScclRMbgSbSbR0jZWVtHcuQ1Ur8WAdzCdWK0FjswFBdCLyD5kRZIhfgUO6ZQfJnBv4a", - "jhuhBB1IyVW94HRl6vnRtQuvKjBO5LUE01rxHjXYOmsA4CHcmAilVTzZOuM7UziLx6+FFWPfcqZ+SEHk", - "w0OPirb49iSEMCmr8SbDPpTiChmwTyQqjoYB5zOtPpqzFToid+ENchECIWQUUIzQ1SiQ7R2uAEdX15Kr", - "P3tnaGMJFfexZKcVPcJCcr646YnAtersNMKvOpvRSnCH4IyNzdmntTGonNwzTUgDDV3v3T2ssXN28fmL", - "1eefvVy/Ovnis/Xpi7PPL0Ke5cJg7rTZswpcwf7CLn5IFh/4fz8kFwyqilTKg9io6pLk2wiJa9qfpAkX", - "pnn0ryPmF2AL5Otu5/lE8NznNIcoFzXQk74XsQFgwbKTl6cBzfdebHKa6BJz9pVmCq1DToqpc1cbtOxD", - "D7A2ZVzkdBUYgfYjBgaZratKGzcWPTKfUm4+fk5CSw0uSb0vPCrktHRNPuruDHWOsOxLULBFE5BPOB/6", - "UBKUTyQvCRnKdys6ojKfXjBNJd2DfDUKh+gSgb3enY/FBmlrIhV/IaxrnMF79/16O9RRU2j8fxKfDRDx", - "HnG7K6YEbCrOA7HiAjNYGbTEAgNmQ/kS6yCPRDeY1w4fq4SfZPERc9Nme9BcnxmjfRU5rsM5DkrIJloO", - "C9sSrYXtFK8jdjzNbv8UN29CyQ5Sfr1Jlt8/bNdVU4zQqdv0QASD4HDKTrQgtGJOlGgdlBWhQCMoB4cz", - "WpkqFsQEuW+/PXnZgPsbXzw/Unc/tRegAG1bgbriv7M0I+t4Thuddfe1zJ7fngcDfYkOODjwhuLcFzsg", - "Twe6P5B41C2aTDgDZs/KSCwmOztnX2rjw6WSeNNH+hwU5YpSU7HpcaKm2GIXMM/m+QVT2gU9NIXhFe4p", - "qvAGiFZ0ce9oy2RVGeGQvTJiW7jY7syxBCGJ631mUP0ti4lHm22zI8RksvIb2Mr99z/XKHtwMnDkVS9O", - "p/UUaqjJs62DNGkLcieufUcFKicNhOaqkujiswrKElrNNiDCjvahgtr6hx9rrP0DmLygjrx9DFkxkJ+R", - "Z/hkG4kMXvjnQKUmFc36lydpsgPfUcw22syofrCTafUb3Arr0CAPEHgIQsC5QTvtUBKsW3ulDDvuXsoU", - "+dX9vboER0EyjbB643Zg7oHfJ8VuEKkL3zbBrdvueJjAHm0gf1NT3+oibZXa7+obZaRJHgpSz2Uy1nJP", - "M/dINIXpK8xrI9z+nkzz5PTxUN4YpILJ8qxrzLomlrLxKwklqlyPoKLsgdz7g424cHz3D/brT3c/3/1y", - "96+7n3/96e7fd7/c/bM/bln+9WiY+OMt67zkyTJ5Gz/ekgWLWl2trfg7JstjkskZyN0aai50AzkUlL6m", - "XyYL408u7GZxqTNyYFT47Pnx3JPsp5LTr17Tx8omy+cfp8mGylibLJNns2dHVE6XsEW71mZ9LThqqhH8", - "myRNdO2q2oVWAm8cKhvsMq885AQO1mHXkKVwSctULy6sIFPNouCzcCRM4Ybe1dnxkVzb5rWnzvjaXpiM", - "MzHw65nrsTTfbO316g8HQwzmOGVruZqKjd5I8R3ySZs5Wqin2O8yy1PyRJt0KqNztJSuJzNBAMuQDwyE", - "oB3DxG9Ac8wNuuml34jKI6PEmwaAOnlFD5CnLDZIHj2bWQfGhTQNO7jyaG4lIlV86NE1TWxRe1+acb3z", - "gw70w7gJvQfVeGhekSsG0Xf+7jXUhA4Hpa5FQ1wzYX2JFTazk5cpq8DanTa8WQq6CANXBq7ZanpGJsj1", - "QeAnMWBF3qWzwrkquSUehdro0JEoB7nrWqOkgW52hkCqro2MJ+1ysdg0wC704rAC/SbMmV6BKVkZWk32", - "4vSEUp7IUVns3fP69Ivr4wP6u91uvlU14fwinrGLbSVnx/OjOap54cpQGgonB9zG65I0uUYTkfDZ/Gh+", - "RLt1hQoqQUnBv6IgcoW3zAIq4THaB4e2XhUUIl6ZJzzMmkrhQhMSA/ETzfeN+lD5M1BVUuT+1OLShmAL", - "YPQYVA07rtsDrfo5iI4JNukHB+UdHy220qQpuun50dEfytkOLLN1nqPd1FLuWZhCI2dCOc2E4uJa8Bpk", - "GFzPR1P734XNUANN8OcXWFPi+NisyxLMvrUqA6Zw52cm1OG07hQHJb3Jgh9zAyUeP8qg1q1P7k0zebXk", - "fAwVr7RQzsvb+tiixcItTjjaa3TteOc9WvVwljShunZTN08aKfA1OiYPZk5+HFOgMKOR3AOq665q1X/Z", - "fRU10N/bS52tBb+9V4Wv0OVFCNXufj/zECRVnMhGCArEDiIq7enxsb7g/D3a6YGg8/A9NIeX3C8wyMJX", - "It52T/DbcEjxCKIlcd6oPWSYhYlt5WzXdZWTYNn0n7H7fD+IOVHaTCgq7KIQbrj/Q8HzoBOfYFGRe0nW", - "8PCHgmOt8KbCnDo2jHv6jtGwHxFy19iz8aX44nziUDAJ4UJ30o49ysVfPNyTc/MCeS3xLHTM7w8L+7+/", - "mFCS/+VFPwncpsnzo48Pi7ivdPxmdvhtk5+6N8Po2zT5+Oj498vOgxHABPOnaJp89BKVQD4oTz0qDgrT", - "788Jzzprfp05ECo6gBtq4jFP8Iqz0Yqmnw89C+a6weVQ/y0SujpSHOv2E9In/fcq9d83EidbdD5RtNPO", - "DGQmYYDv1o+wR6nt9GSY7IN9PM1cl2WtyB7xpwnjimDekY9y357f/i8AAP//twRIw+EjAAA=", + "H4sIAAAAAAAC/9Ra3W4ct/V/FWLyB5LgP7srW2kv9qqOHTsykljIKshFLKzODs/uUOKQE5Kj9dYQkIfo", + "m7QBetFc9QWUNyoOyfnc0VdjGWgQGKMheXg+fudz9n2S6aLUCpWzyfx9YrMcC/CPz6wVG4X8BOwF/c3R", + "ZkaUTmiVzHurTFgGzNETWCYc/W0wQ3GJnK12zOXIftTmAs00SZPS6BKNE+hvyXRRgOL+WTgs/MP/GVwn", + "8+STWcvcLHI2ex4OJFdp4nYlJvMEjIEd/X2uV3Q6vrbOCLWJ75elEdoIt+tsEMrhBk29I7wdOa6gGF+4", + "naZ14Ko7xSH9LcJOkgjsxc2MVBbN+EIlOC2stSnAJfPwIh1uvEoTgz9XwiBP5j/Vm0hrkXaUteG9I+JA", + "ix2VdblOW3ueNtfr1Tlmjvh8dglCwkria71aoHPE1R6yFkJtJDIb1pleM2Cv9YoRNTsCoFyLLDz26fyY", + "o2IbcYkqZVIUwnkcXoIUnP6t0DKn6Z1FFolM2Rsld6yyxCPbCpezoDt/Od3dQHTPBkMwclxDJd0+Xyc5", + "srgY+GA211sVmWFkCLYl3jk6NIVQ/v5c2FolUyKPXDjiMtCPV61BWkz39eByNEQfpNRbRkeHNBmsHe3J", + "kZ3rFcvBshWiYrZaFcI55FP2o64kZ6Io5Y5xlBiOScnwnbCBINgLy9baBNLnepUyUJxigS5KIWmPcNO3", + "qoXmSmuJoEiiC9ztK+uIo3JiLdBEug0wUlZU1rEVskqJn6tgLqEaEWqL7RmqdYEHaE4UBXIBDuWOGSQ8", + "M/DXcFwLJehASlD1gtOVqedHVy68KsE4kVUSTGPFG9Rgq1UdAG6LGyOutIgnGzA+mMJJPH4prBhiy5nq", + "NgURhvuIirb44Si4MCmrRpNhn0lxgQzYlxIVR8OA84lWn0/ZAh2RO/MGOQuOEDIKKEbR1SiQzR0uB0dX", + "V5KrTz0YGl9Cxb0v2XFFD2IhgS9uumfgWrR2GsSvajWhlQCHAMba5ux5ZQwqJ3dMU6SBmq5HdyfW2Ck7", + "+/rZ4uuvXixfHn3z1fL42cnXZyHPcmEwc9rsWAkuZ//Pzt4ms0/8f2+TMwZlSSrlQWxUVUHyrYXEJe1P", + "0oQLUz/61zHm52Bz5Mt25+mI89wEmv0oFzXQkb7jsSHAgmVHL45DNN95sQk0ERJT9p1mCq1DToqpMlcZ", + "tOwzH2BtyrjI6CowAu3nDAwyW5WlNm4oemQ+pdx8+JSElhpcknos3CnkuHR1PmrvDHWOsOxbULBBEyKf", + "cN71oaBQPpK8JKxQPqzoiMq8f8E0lnT38tXAHSIkAnudO+/yDdLWSCr+RlhXg8Gj+2a97euoLjT+O4lP", + "ehHxBnHbK8YErCvOPbHiAjNYGrTEAgNmQ/kS6yAfid5hVjm8qxK+l8UHzI2b7VZzfWWM9lXksA7n2Csh", + "a2/ZL2wLtBY2Y7wO2PE02/1j3LwOJTtI+WadzH+63a6LuhihU1fpnggGweGYnWhBaMWcKNA6KEqKArWg", + "HBxOaGWsWBAj5H744ehFHdxf++L5jrr7vr0AOWjTClQl/8DSDKzjOa111t7XMHt6dRoM9C064ODAG4pz", + "X+yAPO7pfk/iQbdoVsIZMDtWRGIx2dkp+1Yb7y6lxHfdSJ+BolxRaCo2fZyoyLfYGUxX0+yMKe2CHurC", + "8AJ35FX4DohWhLgH2jxZlEY4ZC+N2OQutjtTLEBI4nq3Mqj+soqJR5tNvSP4ZLLwG9jC/ftflyg74aQH", + "5EXHT8f1FGqo0bMNQOq0BZkTl76jApWRBkJzVUp08VkFZQmtJmsQYUfzUEJl/cPPFVb+AUyWU0fePIas", + "GMhPCBk+2UYivRf+OVCpSEWT7uVJmmzBdxSTtTYTqh/saFr9HjfCOjTIQwjcD0LAuUE7DigJ1i29Uvod", + "dydliuzi5l5dgiMnGY+weu22YG4Iv/fy3SBS675Ngls23XE/gd3ZQP6hpr7RRdootdvV18pIkywUpJ7L", + "ZKjljmZukGgspi8wq4xwuxsyzb3Tx215o5cKRsuztjFrm1jKxi8lFKgyPQgVRSfIPV7YiAuH139jv/9y", + "/ev1b9f/uP7191+u/3n92/Xfu+OW+Z8O+ok/3rLMCp7Mk/fxzyuyYF6pi6UVf8VkfkgyOQOZW0LFha5D", + "Djmlr+nnycz4kzO7np3rFQEYFT55ejj1JLup5Pi7V/RnaZP50y/SZE1lrE3myZPJkwMqpwvYoF1qs7wU", + "HDXVCP5Nkia6cmXlQiuB7xwqG+wyLX3ICRwsw64+S+GShqmOX1hBpppEwSfhSJjC9dHV2vGOXNvktfvO", + "+JpemIwzMvDrmOuuNF9v7fTqtztDdOY4ZWu4GvONzkjxAfmkyRxNqCffbzPLffJEk3RKozO0lK5HM0EI", + "liEfGAhOOwwTfyCaY2bQjS/9wag8MEq8qRdQR6/oBOQxi8XkITbqzUM18YEl6uSNe8f7NvXh8xzUBvdF", + "CJln2WLlQdl0qPUhsduZ6ruBdWBcqHxgCxc+QVqJSEU0+oSVJjavvHtOuN762RH6+eYIlAPafLZbEOtB", + "2q2/ewkVBdy97sGiIQ0zYX3VGjazoxcpK8HarTa8XgrwCjNsBq7eajp+Q1nMK80Pt8CKrK0QcufK5Ip4", + "FGqtQ5OnHGSu7TaTOhuyEwRCb2VkPGnns9m6zpVCz/aL+u/D6O4lmIIVoXtnz46PqIoQGSqLnXteHX9z", + "ebhHf7vdTjeqotQ5i2fsbFPKyeH0YIpqmrsiVNvCyR638bokTS7RxOTyZHowPaDdukQFpaA8619RXHK5", + "t8wMSuHTnoeotl4VBFSvzCMexneFcKGvixD7UvNdrT5U/gyUpRSZPzU7tyF+BfDeBe1+E3u1p1U/WtKx", + "Zkm6yKdU7l3Blpo0RTc9PTj4qJxtwTJbZRnadSXljoXBPnImlNNMKC4uBa9Ahm8B08GHkA/CZigrR/jz", + "C6yuGr1vVkUBZtdYlQFTuPVjKGoaGzjF2VNnWOO/HADlcj8dom64S+51Pcy2BD6GipdaKOflbTA2a4Lx", + "BkeA9gpdMzF7RKvuj+dGVNdsakd0AwW+Qsfk3hjPT7hyFGYw5bxFde1VjfrP2697Pf29P9erpeBXN6rw", + "JbosD67a3u/HSIKkikPuGIICsT2PSjt6vKvVOn1EO93idD58983hJfcLDFbhK5O33T1wGw4pHoNoQZzX", + "ag8ZZmZipz7Zto36aLCsW/rY0D9OxBypFkcUFXaRC9fcf9TguTfcGGFREbwkq3n4qMGxUviuxIyaYIx7", + "usCo2Y8Rclvbs8ZSfHE6ciiYhOJCe9IOEWXFRk1imT+edkPx+5gIilfcjJ2md/yYwNmvnv8nkBOrXh9s", + "e/XuT6cUJjvxvnI5KkdMIXtbHRw8/TMjNNTfh7fN55BbsfYcZJwCB4X5H29IDIGvzt8Xyn+Ext2nBplB", + "4DvaRfTC5zinWRhzx6RVG3w6hKuLv3m6AatZjrySeBJmZo+Xuru/wBqxjP/tVbdmuUqTpwdf7Pcc3+n4", + "24z+92b/3a3+HHWVJl8cHH64YrI3BBxh/hhNXT69QCWQPxBXb1YOhIrxyvU1cReYvOJstKLplm+eBXNZ", + "lxGhXZkldHWkONTtl6RP+t+r1P/igDjZoPN1TfO9YwVyJaFXjlj/EWtQiR0f9WvTDtwzXRSVCp7kf5w0", + "LGCnLfko99Xp1X8CAAD//yqfdC3jJwAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/pkg/api/openapi_types.gen.go b/pkg/api/openapi_types.gen.go index 87e22e5e..176963df 100644 --- a/pkg/api/openapi_types.gen.go +++ b/pkg/api/openapi_types.gen.go @@ -242,6 +242,17 @@ type WorkerRegistration struct { SupportedTaskTypes []string `json:"supported_task_types"` } +// WorkerSignOn defines model for WorkerSignOn. +type WorkerSignOn struct { + Nickname string `json:"nickname"` + SupportedTaskTypes []string `json:"supported_task_types"` +} + +// WorkerStateChange defines model for WorkerStateChange. +type WorkerStateChange struct { + StatusRequested WorkerStatus `json:"status_requested"` +} + // WorkerStatus defines model for WorkerStatus. type WorkerStatus string @@ -251,12 +262,18 @@ type SubmitJobJSONBody SubmittedJob // RegisterWorkerJSONBody defines parameters for RegisterWorker. type RegisterWorkerJSONBody WorkerRegistration +// SignOnJSONBody defines parameters for SignOn. +type SignOnJSONBody WorkerSignOn + // SubmitJobJSONRequestBody defines body for SubmitJob for application/json ContentType. type SubmitJobJSONRequestBody SubmitJobJSONBody // RegisterWorkerJSONRequestBody defines body for RegisterWorker for application/json ContentType. type RegisterWorkerJSONRequestBody RegisterWorkerJSONBody +// SignOnJSONRequestBody defines body for SignOn for application/json ContentType. +type SignOnJSONRequestBody SignOnJSONBody + // Getter for additional properties for JobMetadata. Returns the specified // element and whether it was found func (a JobMetadata) Get(fieldName string) (value string, found bool) {