diff --git a/cmd/flamenco-manager-poc/main.go b/cmd/flamenco-manager-poc/main.go
index 23890d79..30048b90 100644
--- a/cmd/flamenco-manager-poc/main.go
+++ b/cmd/flamenco-manager-poc/main.go
@@ -89,6 +89,7 @@ func buildWebService(flamenco api.ServerInterface) *echo.Echo {
// Ensure panics when serving a web request won't bring down the server.
e.Use(middleware.Recover())
+ e.Use(api_impl.MiddleWareRequestLogger)
// Load the API definition and enable validation & authentication checks.
swagger, err := api.GetSwagger()
diff --git a/cmd/flamenco-worker-poc/config.go b/cmd/flamenco-worker-poc/config.go
new file mode 100644
index 00000000..6d3547ad
--- /dev/null
+++ b/cmd/flamenco-worker-poc/config.go
@@ -0,0 +1,81 @@
+package main
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/rs/zerolog/log"
+ "gitlab.com/blender/flamenco-ng-poc/internal/worker"
+)
+
+/* ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * Original Code Copyright (C) 2022 Blender Foundation.
+ *
+ * This file is part of Flamenco.
+ *
+ * Flamenco is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later
+ * version.
+ *
+ * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * Flamenco. If not, see .
+ *
+ * ***** END GPL LICENSE BLOCK ***** */
+
+const (
+ credentialsFilename = "flamenco-worker-credentials.yaml"
+ configFilename = "flamenco-worker.yaml"
+)
+
+func loadConfig(configWrangler worker.FileConfigWrangler) (worker.WorkerConfig, error) {
+ logger := log.With().Str("filename", configFilename).Logger()
+
+ var cfg worker.WorkerConfig
+
+ err := configWrangler.LoadConfig(configFilename, &cfg)
+
+ // If the configuration file doesn't exist, write the defaults & retry loading them.
+ if os.IsNotExist(err) {
+ logger.Info().Msg("writing default configuration file")
+ cfg = configWrangler.DefaultConfig()
+ err = configWrangler.WriteConfig(configFilename, "Configuration", cfg)
+ if err != nil {
+ return cfg, fmt.Errorf("error writing default config: %w", err)
+ }
+ err = configWrangler.LoadConfig(configFilename, &cfg)
+ }
+ if err != nil {
+ return cfg, fmt.Errorf("error loading config from %s: %w", configFilename, err)
+ }
+
+ // Validate the manager URL.
+ if cfg.Manager != "" {
+ _, err := worker.ParseURL(cfg.Manager)
+ if err != nil {
+ return cfg, fmt.Errorf("error parsing manager URL %s: %w", cfg.Manager, err)
+ }
+ logger.Debug().Str("url", cfg.Manager).Msg("parsed manager URL")
+ }
+
+ return cfg, nil
+}
+
+func loadCredentials(configWrangler worker.FileConfigWrangler) (worker.WorkerCredentials, error) {
+ logger := log.With().Str("filename", configFilename).Logger()
+ logger.Info().Msg("loading credentials")
+
+ var creds worker.WorkerCredentials
+
+ err := configWrangler.LoadConfig(credentialsFilename, &creds)
+ if err != nil {
+ return worker.WorkerCredentials{}, err
+ }
+
+ return creds, nil
+}
diff --git a/cmd/flamenco-worker-poc/main.go b/cmd/flamenco-worker-poc/main.go
index 33bb7af5..c8344a34 100644
--- a/cmd/flamenco-worker-poc/main.go
+++ b/cmd/flamenco-worker-poc/main.go
@@ -23,8 +23,10 @@ package main
import (
"context"
"fmt"
- "net/http"
- "net/url"
+ "os"
+ "os/signal"
+ "runtime"
+ "syscall"
"time"
"github.com/mattn/go-colorable"
@@ -33,8 +35,11 @@ import (
"gitlab.com/blender/flamenco-ng-poc/internal/appinfo"
"gitlab.com/blender/flamenco-ng-poc/internal/worker"
- "gitlab.com/blender/flamenco-ng-poc/internal/worker/ssdp"
- "gitlab.com/blender/flamenco-ng-poc/pkg/api"
+)
+
+var (
+ w *worker.Worker
+ shutdownComplete chan struct{}
)
func main() {
@@ -47,70 +52,65 @@ func main() {
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
- log.Info().Str("version", appinfo.ApplicationVersion).Msgf("starting %v Worker", appinfo.ApplicationName)
+ log.Info().
+ Str("version", appinfo.ApplicationVersion).
+ Str("OS", runtime.GOOS).
+ Str("ARCH", runtime.GOARCH).
+ Int("pid", os.Getpid()).
+ Msgf("starting %v Worker", appinfo.ApplicationName)
- // configWrangler := worker.NewConfigWrangler()
- managerFinder := ssdp.NewManagerFinder(cliArgs.managerURL)
- // taskRunner := struct{}{}
- findManager(managerFinder)
+ configWrangler := worker.NewConfigWrangler()
- // basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth("MY_USER", "MY_PASS")
- // if err != nil {
- // log.Panic().Err(err).Msg("unable to create basic authr")
- // }
+ startupCtx, sctxCancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+ client, startupState := registerOrSignOn(startupCtx, configWrangler)
+ sctxCancelFunc()
- // flamenco, err := api.NewClientWithResponses(
- // "http://localhost:8080/",
- // api.WithRequestEditorFn(basicAuthProvider.Intercept),
- // api.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
- // req.Header.Set("User-Agent", appinfo.UserAgent())
- // return nil
- // }),
- // )
- // if err != nil {
- // log.Fatal().Err(err).Msg("error creating client")
- // }
+ shutdownComplete = make(chan struct{})
- // w := worker.NewWorker(flamenco, configWrangler, managerFinder, taskRunner)
- // ctx := context.Background()
- // registerWorker(ctx, flamenco)
- // obtainTask(ctx, flamenco)
+ taskRunner := struct{}{}
+ w = worker.NewWorker(client, taskRunner)
+
+ // Handle Ctrl+C
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+ signal.Notify(c, syscall.SIGTERM)
+ go func() {
+ for signum := range c {
+ // Run the shutdown sequence in a goroutine, so that multiple Ctrl+C presses can be handled in parallel.
+ go shutdown(signum)
+ }
+ }()
+
+ workerCtx := context.Background()
+ w.Start(workerCtx, startupState)
+
+ <-shutdownComplete
+
+ log.Debug().Msg("process shutting down")
}
-func obtainTask(ctx context.Context, flamenco *api.ClientWithResponses) {
- resp, err := flamenco.ScheduleTaskWithResponse(ctx)
- if err != nil {
- log.Fatal().Err(err).Msg("error obtaining task")
- }
- switch {
- case resp.JSON200 != nil:
- log.Info().
- Interface("task", resp.JSON200).
- Msg("obtained task")
- case resp.JSON403 != nil:
- log.Fatal().
- Int("code", resp.StatusCode()).
- Str("error", string(resp.JSON403.Message)).
- Msg("access denied")
- case resp.StatusCode() == http.StatusNoContent:
- log.Info().Msg("no task available")
- default:
- log.Fatal().
- Int("code", resp.StatusCode()).
- Str("error", string(resp.Body)).
- Msg("unable to obtain task")
- }
-}
+func shutdown(signum os.Signal) {
+ done := make(chan struct{})
+ go func() {
+ log.Info().Str("signal", signum.String()).Msg("signal received, shutting down.")
+
+ if w != nil {
+ shutdownCtx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancelFunc()
+ w.SignOff(shutdownCtx)
+ w.Close()
+ }
+ close(done)
+ }()
-func findManager(managerFinder worker.ManagerFinder) *url.URL {
- finder := managerFinder.FindFlamencoManager()
select {
- case manager := <-finder:
- log.Info().Str("manager", manager.String()).Msg("found Manager")
- return manager
- case <-time.After(10 * time.Second):
- log.Fatal().Msg("unable to autodetect Flamenco Manager via UPnP/SSDP; configure the URL explicitly")
+ case <-done:
+ log.Debug().Msg("shutdown OK")
+ case <-time.After(20 * time.Second):
+ log.Error().Msg("shutdown forced, stopping process.")
+ os.Exit(-2)
}
- return nil
+ log.Warn().Msg("shutdown complete, stopping process.")
+ close(shutdownComplete)
}
diff --git a/cmd/flamenco-worker-poc/registration.go b/cmd/flamenco-worker-poc/registration.go
new file mode 100644
index 00000000..3572c0c8
--- /dev/null
+++ b/cmd/flamenco-worker-poc/registration.go
@@ -0,0 +1,188 @@
+package main
+
+/* ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * Original Code Copyright (C) 2022 Blender Foundation.
+ *
+ * This file is part of Flamenco.
+ *
+ * Flamenco is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later
+ * version.
+ *
+ * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * Flamenco. If not, see .
+ *
+ * ***** END GPL LICENSE BLOCK ***** */
+
+import (
+ "context"
+ "crypto/rand"
+ "encoding/hex"
+ "errors"
+ "net/http"
+ "os"
+ "runtime"
+
+ "github.com/deepmap/oapi-codegen/pkg/securityprovider"
+ "github.com/rs/zerolog/log"
+ "gitlab.com/blender/flamenco-ng-poc/internal/appinfo"
+ "gitlab.com/blender/flamenco-ng-poc/internal/worker"
+ "gitlab.com/blender/flamenco-ng-poc/pkg/api"
+)
+
+var errSignOnFailure = errors.New("unable to sign on at Manager")
+
+func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrangler) (
+ client api.ClientWithResponsesInterface, startupState api.WorkerStatus,
+) {
+ // Load configuration
+ cfg, err := loadConfig(configWrangler)
+ if err != nil {
+ log.Fatal().Err(err).Msg("loading configuration")
+ }
+
+ if cfg.Manager == "" {
+ log.Fatal().Msg("no manager configured")
+ }
+
+ // Load credentials
+ creds, err := loadCredentials(configWrangler)
+ if err == nil {
+ // Credentials can be loaded just fine, try to sign on with them.
+ client = authenticatedClient(cfg, creds)
+ startupState, err = signOn(ctx, cfg, client)
+ if err == nil {
+ // Sign on is fine!
+ return
+ }
+ }
+
+ // Either there were no credentials, or existing ones weren't accepted, just register as new worker.
+ client = authenticatedClient(cfg, worker.WorkerCredentials{})
+ creds = register(ctx, cfg, client)
+
+ // store ID and secretKey in config file when registration is complete.
+ err = configWrangler.WriteConfig(credentialsFilename, "Credentials", creds)
+ if err != nil {
+ log.Fatal().Err(err).Str("file", credentialsFilename).
+ Msg("unable to write credentials configuration file")
+ }
+
+ // Sign-on should work now.
+ client = authenticatedClient(cfg, creds)
+ startupState, err = signOn(ctx, cfg, client)
+ if err != nil {
+ log.Fatal().Err(err).Str("manager", cfg.Manager).Msg("unable to sign on after registering")
+ }
+
+ return
+}
+
+// (Re-)register ourselves at the Manager.
+// Logs a fatal error if unsuccesful.
+func register(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) worker.WorkerCredentials {
+ // Construct our new password.
+ secret := make([]byte, 32)
+ if _, err := rand.Read(secret); err != nil {
+ log.Fatal().Err(err).Msg("unable to generate secret key")
+ }
+ secretKey := hex.EncodeToString(secret)
+
+ req := api.RegisterWorkerJSONRequestBody{
+ Nickname: mustHostname(),
+ Platform: runtime.GOOS,
+ Secret: secretKey,
+ SupportedTaskTypes: cfg.TaskTypes,
+ }
+ resp, err := client.RegisterWorkerWithResponse(ctx, req)
+ if err != nil {
+ log.Fatal().Err(err).Msg("error registering at Manager")
+ }
+ switch {
+ case resp.JSON200 != nil:
+ log.Info().
+ Int("code", resp.StatusCode()).
+ Interface("resp", resp.JSON200).
+ Msg("registered at Manager")
+ default:
+ log.Fatal().
+ Int("code", resp.StatusCode()).
+ Interface("resp", resp.JSONDefault).
+ Msg("unable to register at Manager")
+ }
+
+ return worker.WorkerCredentials{
+ WorkerID: resp.JSON200.Uuid,
+ Secret: secretKey,
+ }
+}
+
+// signOn tells the Manager we're alive and returns the status the Manager tells us to go to.
+func signOn(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) {
+ logger := log.With().Str("manager", cfg.Manager).Logger()
+ logger.Info().Msg("signing on at Manager")
+
+ req := api.SignOnJSONRequestBody{
+ Nickname: mustHostname(),
+ SupportedTaskTypes: cfg.TaskTypes,
+ }
+ resp, err := client.SignOnWithResponse(ctx, req)
+ if err != nil {
+ logger.Warn().Err(err).Msg("unable to send sign-on request")
+ return "", errSignOnFailure
+ }
+ switch {
+ case resp.JSON200 != nil:
+ log.Info().
+ Int("code", resp.StatusCode()).
+ Interface("resp", resp.JSON200).
+ Msg("signed on at Manager")
+ default:
+ log.Warn().
+ Int("code", resp.StatusCode()).
+ Interface("resp", resp.JSONDefault).
+ Msg("unable to sign on at Manager")
+ return "", errSignOnFailure
+ }
+
+ startupState := resp.JSON200.StatusRequested
+ log.Info().Str("startup_state", string(startupState)).Msg("manager accepted sign-on")
+ return startupState, nil
+}
+
+// mustHostname either the hostname or logs a fatal error.
+func mustHostname() string {
+ hostname, err := os.Hostname()
+ if err != nil {
+ log.Fatal().Err(err).Msg("error getting hostname")
+ }
+ return hostname
+}
+
+// authenticatedClient constructs a Flamenco client with the given credentials.
+func authenticatedClient(cfg worker.WorkerConfig, creds worker.WorkerCredentials) api.ClientWithResponsesInterface {
+ basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth(creds.WorkerID, creds.Secret)
+ if err != nil {
+ log.Panic().Err(err).Msg("unable to create basic auth provider")
+ }
+
+ flamenco, err := api.NewClientWithResponses(
+ cfg.Manager,
+ api.WithRequestEditorFn(basicAuthProvider.Intercept),
+ api.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
+ req.Header.Set("User-Agent", appinfo.UserAgent())
+ return nil
+ }),
+ )
+ if err != nil {
+ log.Fatal().Err(err).Msg("error creating client")
+ }
+
+ return flamenco
+}
diff --git a/flamenco-worker-credentials.yaml b/flamenco-worker-credentials.yaml
new file mode 100644
index 00000000..f829954f
--- /dev/null
+++ b/flamenco-worker-credentials.yaml
@@ -0,0 +1,9 @@
+# Credentials file for Flamenco Worker.
+# For an explanation of the fields, refer to flamenco-worker-example.yaml
+#
+# NOTE: this file can be overwritten by Flamenco Worker.
+#
+# This file was written on 2022-01-31 14:49:14 +01:00
+
+worker_id: 9b41b767-74de-4cac-9604-f3521821d68d
+worker_secret: 9b4d6b672181d29dfc65ceb59ff7aba3aab3e5bd9cac5d50342eb3a7217b0085
diff --git a/flamenco-worker.yaml b/flamenco-worker.yaml
new file mode 100644
index 00000000..68be88d9
--- /dev/null
+++ b/flamenco-worker.yaml
@@ -0,0 +1,14 @@
+# Configuration file for Flamenco Worker.
+# For an explanation of the fields, refer to flamenco-worker-example.yaml
+#
+# NOTE: this file can be overwritten by Flamenco Worker.
+#
+# This file was written on 2022-01-31 14:45:36 +01:00
+
+manager_url: "http://localhost:8080/"
+task_types:
+- sleep
+- blender-render
+- file-management
+- exr-merge
+- debug
diff --git a/go.mod b/go.mod
index 40bcb70d..b79c3e43 100644
--- a/go.mod
+++ b/go.mod
@@ -19,7 +19,6 @@ require (
github.com/shopspring/decimal v1.2.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/ziflex/lecho/v3 v3.1.0
- gitlab.com/blender-institute/gossdp v0.0.0-20181214124559-074ccf115d76
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/postgres v1.0.8
diff --git a/go.sum b/go.sum
index 782dd666..d0949987 100644
--- a/go.sum
+++ b/go.sum
@@ -227,8 +227,6 @@ github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
github.com/ziflex/lecho/v3 v3.1.0 h1:65bSzSc0yw7EEhi44lMnkOI877ZzbE7tGDWfYCQXZwI=
github.com/ziflex/lecho/v3 v3.1.0/go.mod h1:dwQ6xCAKmSBHhwZ6XmiAiDptD7iklVkW7xQYGUncX0Q=
-gitlab.com/blender-institute/gossdp v0.0.0-20181214124559-074ccf115d76 h1:ASbeHgntCaY+Q/qRUX1y6T12WncACelKVRUFGjyIOVM=
-gitlab.com/blender-institute/gossdp v0.0.0-20181214124559-074ccf115d76/go.mod h1:+j3oHEe07Rw8lFbVhESVy83XVW51AndFrjbUMb2JI4k=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
diff --git a/internal/manager/api_impl/logging.go b/internal/manager/api_impl/logging.go
new file mode 100644
index 00000000..e6d2e246
--- /dev/null
+++ b/internal/manager/api_impl/logging.go
@@ -0,0 +1,41 @@
+package api_impl
+
+import (
+ "context"
+
+ "github.com/labstack/echo/v4"
+ "github.com/rs/zerolog"
+ "github.com/rs/zerolog/log"
+)
+
+type loggerContextKey string
+
+const (
+ loggerKey = loggerContextKey("logger")
+)
+
+// MiddleWareRequestLogger is Echo middleware that puts a Zerolog logger in the request context, for endpoints to use.
+func MiddleWareRequestLogger(next echo.HandlerFunc) echo.HandlerFunc {
+ return func(c echo.Context) error {
+ remoteIP := c.RealIP()
+ logger := log.With().Str("remoteAddr", remoteIP).Logger()
+ ctx := context.WithValue(c.Request().Context(), loggerKey, logger)
+ c.SetRequest(c.Request().WithContext(ctx))
+
+ if err := next(c); err != nil {
+ c.Error(err)
+ }
+ return nil
+ }
+}
+
+func requestLogger(e echo.Context) zerolog.Logger {
+ ctx := e.Request().Context()
+ logger, ok := ctx.Value(loggerKey).(zerolog.Logger)
+ if ok {
+ return logger
+ }
+
+ log.Error().Msg("no logger found in request context, returning default logger")
+ return log.With().Logger()
+}
diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go
index bb7a445e..82e22dda 100644
--- a/internal/manager/api_impl/workers.go
+++ b/internal/manager/api_impl/workers.go
@@ -26,7 +26,6 @@ import (
"github.com/google/uuid"
"github.com/labstack/echo/v4"
- "github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/internal/manager/persistence"
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
@@ -34,8 +33,7 @@ import (
// RegisterWorker registers a new worker and stores it in the database.
func (f *Flamenco) RegisterWorker(e echo.Context) error {
- remoteIP := e.RealIP()
- logger := log.With().Str("ip", remoteIP).Logger()
+ logger := requestLogger(e)
var req api.RegisterWorkerJSONBody
err := e.Bind(&req)
@@ -53,7 +51,7 @@ func (f *Flamenco) RegisterWorker(e echo.Context) error {
Name: req.Nickname,
Secret: req.Secret,
Platform: req.Platform,
- Address: remoteIP,
+ Address: e.RealIP(),
SupportedTaskTypes: strings.Join(req.SupportedTaskTypes, ","),
}
if err := f.persist.CreateWorker(e.Request().Context(), &dbWorker); err != nil {
@@ -74,8 +72,7 @@ func (f *Flamenco) RegisterWorker(e echo.Context) error {
}
func (f *Flamenco) SignOn(e echo.Context) error {
- remoteIP := e.RealIP()
- logger := log.With().Str("ip", remoteIP).Logger()
+ logger := requestLogger(e)
var req api.SignOnJSONBody
err := e.Bind(&req)
@@ -92,6 +89,44 @@ func (f *Flamenco) SignOn(e echo.Context) error {
})
}
+func (f *Flamenco) SignOff(e echo.Context) error {
+ logger := requestLogger(e)
+
+ var req api.SignOnJSONBody
+ err := e.Bind(&req)
+ if err != nil {
+ logger.Warn().Err(err).Msg("bad request received")
+ return sendAPIError(e, http.StatusBadRequest, "invalid format")
+ }
+
+ logger.Info().Str("nickname", req.Nickname).Msg("worker signing off")
+
+ // TODO: store status in DB.
+ return e.String(http.StatusNoContent, "")
+}
+
+// (GET /api/worker/state)
+func (f *Flamenco) WorkerState(e echo.Context) error {
+ // TODO: look up proper status in DB.
+ return e.String(http.StatusNoContent, "")
+}
+
+// Worker changed state. This could be as acknowledgement of a Manager-requested state change, or in response to worker-local signals.
+// (POST /api/worker/state-changed)
+func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
+ logger := requestLogger(e)
+
+ var req api.WorkerStateChangedJSONRequestBody
+ err := e.Bind(&req)
+ if err != nil {
+ logger.Warn().Err(err).Msg("bad request received")
+ return sendAPIError(e, http.StatusBadRequest, "invalid format")
+ }
+
+ logger.Info().Str("newStatus", string(req.Status)).Msg("worker changed status")
+ return e.String(http.StatusNoContent, "")
+}
+
func (f *Flamenco) ScheduleTask(e echo.Context) error {
return e.JSON(http.StatusOK, &api.AssignedTask{
Uuid: uuid.New().String(),
diff --git a/internal/worker/config.go b/internal/worker/config.go
index 2e77212e..8a5b321a 100644
--- a/internal/worker/config.go
+++ b/internal/worker/config.go
@@ -43,23 +43,16 @@ type WorkerConfig struct {
TaskTypes []string `yaml:"task_types"`
}
-type workerCredentials struct {
+type WorkerCredentials struct {
WorkerID string `yaml:"worker_id"`
Secret string `yaml:"worker_secret"`
}
-// ConfigWrangler makes it simple to load and write configuration files.
-type ConfigWrangler interface {
- DefaultConfig() WorkerConfig
- WriteConfig(filename string, filetype string, config interface{}) error
- LoadConfig(filename string, config interface{}) error
-}
-
// FileConfigWrangler is the default config wrangler that actually reads & writes files.
type FileConfigWrangler struct{}
-// NewConfigWrangler returns a new ConfigWrangler instance of the default type FileConfigWrangler.
-func NewConfigWrangler() ConfigWrangler {
+// NewConfigWrangler returns ConfigWrangler that reads files.
+func NewConfigWrangler() FileConfigWrangler {
return FileConfigWrangler{}
}
diff --git a/internal/worker/registration.go b/internal/worker/registration.go
deleted file mode 100644
index 07c21eee..00000000
--- a/internal/worker/registration.go
+++ /dev/null
@@ -1,131 +0,0 @@
-package worker
-
-/* ***** BEGIN GPL LICENSE BLOCK *****
- *
- * Original Code Copyright (C) 2022 Blender Foundation.
- *
- * This file is part of Flamenco.
- *
- * Flamenco is free software: you can redistribute it and/or modify it under
- * the terms of the GNU General Public License as published by the Free Software
- * Foundation, either version 3 of the License, or (at your option) any later
- * version.
- *
- * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * Flamenco. If not, see .
- *
- * ***** END GPL LICENSE BLOCK ***** */
-
-import (
- "context"
- "crypto/rand"
- "encoding/hex"
- "os"
- "runtime"
-
- "github.com/rs/zerolog/log"
- "gitlab.com/blender/flamenco-ng-poc/pkg/api"
-)
-
-// (Re-)register ourselves at the Manager.
-func (w *Worker) register(ctx context.Context) {
- // Construct our new password.
- secret := make([]byte, 32)
- if _, err := rand.Read(secret); err != nil {
- log.Fatal().Err(err).Msg("unable to generate secret key")
- }
- secretKey := hex.EncodeToString(secret)
-
- // TODO: load taskTypes from config file.
- taskTypes := []string{"unknown", "sleep", "blender-render", "debug", "ffmpeg"}
-
- req := api.RegisterWorkerJSONRequestBody{
- Nickname: mustHostname(),
- Platform: runtime.GOOS,
- Secret: secretKey,
- SupportedTaskTypes: taskTypes,
- }
- resp, err := w.client.RegisterWorkerWithResponse(ctx, req)
- if err != nil {
- log.Fatal().Err(err).Msg("error registering at Manager")
- }
- switch {
- case resp.JSON200 != nil:
- log.Info().
- Int("code", resp.StatusCode()).
- Interface("resp", resp.JSON200).
- Msg("registered at Manager")
- default:
- log.Fatal().
- Int("code", resp.StatusCode()).
- Interface("resp", resp.JSONDefault).
- Msg("unable to register at Manager")
- }
-
- // store ID and secretKey in config file when registration is complete.
- err = w.configWrangler.WriteConfig(credentialsFilename, "Credentials", workerCredentials{
- WorkerID: resp.JSON200.Uuid,
- Secret: secretKey,
- })
- if err != nil {
- log.Fatal().Err(err).Str("file", credentialsFilename).
- Msg("unable to write credentials configuration file")
- }
-}
-
-func (w *Worker) reregister(ctx context.Context) {
- w.register(ctx)
- w.loadConfig()
-}
-
-// signOn tells the Manager we're alive and returns the status the Manager tells us to go to.
-// Failure to sign on is fatal.
-func (w *Worker) signOn(ctx context.Context) api.WorkerStatus {
- logger := log.With().Str("manager", w.manager.String()).Logger()
- logger.Info().Msg("signing on at Manager")
-
- if w.creds == nil {
- logger.Fatal().Msg("no credentials, unable to sign on")
- }
-
- // TODO: load taskTypes from config file.
- taskTypes := []string{"unknown", "sleep", "blender-render", "debug", "ffmpeg"}
-
- req := api.SignOnJSONRequestBody{
- Nickname: mustHostname(),
- SupportedTaskTypes: taskTypes,
- }
- resp, err := w.client.SignOnWithResponse(ctx, req)
- if err != nil {
- log.Fatal().Err(err).Msg("error registering at Manager")
- }
- switch {
- case resp.JSON200 != nil:
- log.Info().
- Int("code", resp.StatusCode()).
- Interface("resp", resp.JSON200).
- Msg("signed on at Manager")
- default:
- log.Fatal().
- Int("code", resp.StatusCode()).
- Interface("resp", resp.JSONDefault).
- Msg("unable to sign on at Manager")
- }
-
- startupState := resp.JSON200.StatusRequested
- log.Info().Str("startup_state", string(startupState)).Msg("manager accepted sign-on")
- return startupState
-}
-
-// mustHostname either the hostname or logs a fatal error.
-func mustHostname() string {
- hostname, err := os.Hostname()
- if err != nil {
- log.Fatal().Err(err).Msg("error getting hostname")
- }
- return hostname
-}
diff --git a/internal/worker/ssdp/client.go b/internal/worker/ssdp/client.go
deleted file mode 100644
index ed1b3ebc..00000000
--- a/internal/worker/ssdp/client.go
+++ /dev/null
@@ -1,104 +0,0 @@
-package ssdp
-
-/* ***** BEGIN GPL LICENSE BLOCK *****
- *
- * Original Code Copyright (C) 2022 Blender Foundation.
- *
- * This file is part of Flamenco.
- *
- * Flamenco is free software: you can redistribute it and/or modify it under
- * the terms of the GNU General Public License as published by the Free Software
- * Foundation, either version 3 of the License, or (at your option) any later
- * version.
- *
- * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * Flamenco. If not, see .
- *
- * ***** END GPL LICENSE BLOCK ***** */
-
-import (
- "net/url"
-
- "github.com/rs/zerolog/log"
- "gitlab.com/blender-institute/gossdp"
-)
-
-// Finder is a uses UPnP/SSDP to find a Flamenco Manager on the local network.
-type Finder struct {
- overrideURL *url.URL
-}
-
-type ssdpClient struct {
- response chan interface{}
-}
-
-// NewManagerFinder returns a default SSDP/UPnP based finder.
-func NewManagerFinder(managerURL *url.URL) Finder {
- return Finder{
- overrideURL: managerURL,
- }
-}
-
-func (b *ssdpClient) NotifyAlive(message gossdp.AliveMessage) {
- log.Info().Interface("message", message).Msg("UPnP/SSDP NotifyAlive")
-}
-func (b *ssdpClient) NotifyBye(message gossdp.ByeMessage) {
- log.Info().Interface("message", message).Msg("UPnP/SSDP NotifyBye")
-}
-func (b *ssdpClient) Response(message gossdp.ResponseMessage) {
- log.Debug().Interface("message", message).Msg("UPnP/SSDP response")
- url, err := url.Parse(message.Location)
- if err != nil {
- b.response <- err
- return
- }
- b.response <- url
-}
-
-// FindFlamencoManager tries to find a Manager, sending its URL to the returned channel.
-func (f Finder) FindFlamencoManager() <-chan *url.URL {
- reporter := make(chan *url.URL)
-
- go func() {
- defer close(reporter)
-
- if f.overrideURL != nil {
- log.Debug().Str("url", f.overrideURL.String()).Msg("Using configured Flamenco Manager URL")
- reporter <- f.overrideURL
- return
- }
-
- log.Info().Msg("finding Flamenco Manager via UPnP/SSDP")
- b := ssdpClient{make(chan interface{})}
-
- client, err := gossdp.NewSsdpClientWithLogger(&b, ZeroLogWrapper{})
- if err != nil {
- log.Fatal().Err(err).Msg("Unable to create UPnP/SSDP client")
- return
- }
-
- log.Debug().Msg("Starting UPnP/SSDP client")
- go client.Start()
- defer client.Stop()
-
- if err := client.ListenFor("urn:flamenco:manager:0"); err != nil {
- log.Error().Err(err).Msg("unable to find Manager")
- return
- }
-
- log.Debug().Msg("Waiting for UPnP/SSDP answer")
- urlOrErr := <-b.response
- switch v := urlOrErr.(type) {
- case *url.URL:
- reporter <- v
- case error:
- log.Fatal().Err(v).Msg("Error waiting for UPnP/SSDP response from Manager")
- }
- }()
-
- return reporter
-}
diff --git a/internal/worker/ssdp/zerolog.go b/internal/worker/ssdp/zerolog.go
deleted file mode 100644
index 8ea9b865..00000000
--- a/internal/worker/ssdp/zerolog.go
+++ /dev/null
@@ -1,44 +0,0 @@
-package ssdp
-
-/* ***** BEGIN GPL LICENSE BLOCK *****
- *
- * Original Code Copyright (C) 2022 Blender Foundation.
- *
- * This file is part of Flamenco.
- *
- * Flamenco is free software: you can redistribute it and/or modify it under
- * the terms of the GNU General Public License as published by the Free Software
- * Foundation, either version 3 of the License, or (at your option) any later
- * version.
- *
- * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * Flamenco. If not, see .
- *
- * ***** END GPL LICENSE BLOCK ***** */
-import (
- "fmt"
-
- "github.com/rs/zerolog/log"
- "gitlab.com/blender-institute/gossdp"
-)
-
-var _ gossdp.LoggerInterface = ZeroLogWrapper{}
-
-type ZeroLogWrapper struct{}
-
-func (l ZeroLogWrapper) Debugf(msg string, args ...interface{}) {
- log.Debug().Msg(fmt.Sprintf(msg, args...))
-}
-func (l ZeroLogWrapper) Infof(msg string, args ...interface{}) {
- log.Info().Msg(fmt.Sprintf(msg, args...))
-}
-func (l ZeroLogWrapper) Warnf(msg string, args ...interface{}) {
- log.Warn().Msg(fmt.Sprintf(msg, args...))
-}
-func (l ZeroLogWrapper) Errorf(msg string, args ...interface{}) {
- log.Error().Msg(fmt.Sprintf(msg, args...))
-}
diff --git a/internal/worker/state_asleep.go b/internal/worker/state_asleep.go
new file mode 100644
index 00000000..23584b61
--- /dev/null
+++ b/internal/worker/state_asleep.go
@@ -0,0 +1,66 @@
+package worker
+
+import (
+ "context"
+ "net/http"
+ "time"
+
+ "github.com/rs/zerolog/log"
+ "gitlab.com/blender/flamenco-ng-poc/pkg/api"
+)
+
+const durationSleepCheck = 3 * time.Second
+
+func (w *Worker) gotoStateAsleep(ctx context.Context) {
+ w.stateMutex.Lock()
+ defer w.stateMutex.Unlock()
+
+ w.state = api.WorkerStatusAsleep
+ w.doneWg.Add(2)
+ go w.ackStateChange(ctx, w.state)
+ go w.runStateAsleep(ctx)
+}
+
+func (w *Worker) runStateAsleep(ctx context.Context) {
+ defer w.doneWg.Done()
+ logger := log.With().Str("status", string(w.state)).Logger()
+ logger.Info().Msg("sleeping")
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Debug().Msg("state fetching interrupted by context cancellation")
+ return
+ case <-w.doneChan:
+ logger.Debug().Msg("state fetching interrupted by shutdown")
+ return
+ case <-time.After(durationSleepCheck):
+ }
+ if !w.isState(api.WorkerStatusAwake) {
+ logger.Debug().Msg("state fetching interrupted by state change")
+ return
+ }
+
+ resp, err := w.client.WorkerStateWithResponse(ctx)
+ if err != nil {
+ log.Error().Err(err).Msg("error checking upstream state changes")
+ }
+ switch {
+ case resp.JSON200 != nil:
+ log.Info().
+ Str("requestedStatus", string(resp.JSON200.StatusRequested)).
+ Msg("Manager requests status change")
+ w.changeState(ctx, resp.JSON200.StatusRequested)
+ return
+ case resp.StatusCode() == http.StatusNoContent:
+ log.Debug().Msg("we can keep sleeping")
+ continue
+ default:
+ log.Warn().
+ Int("code", resp.StatusCode()).
+ Str("error", string(resp.Body)).
+ Msg("unable to obtain requested state for unknown reason")
+ continue
+ }
+ }
+}
diff --git a/internal/worker/state_awake.go b/internal/worker/state_awake.go
new file mode 100644
index 00000000..fc6cc063
--- /dev/null
+++ b/internal/worker/state_awake.go
@@ -0,0 +1,106 @@
+package worker
+
+import (
+ "context"
+ "errors"
+ "net/http"
+ "time"
+
+ "github.com/rs/zerolog/log"
+ "gitlab.com/blender/flamenco-ng-poc/pkg/api"
+)
+
+const (
+ // How long to wait to fetch another task...
+ durationNoTask = 5 * time.Second // ... if there is no task now.
+ durationFetchFailed = 10 * time.Second // ... if fetching failed somehow.
+)
+
+var (
+ errUnknownTaskRequestStatus = errors.New("unknown task request status")
+ errReregistrationRequired = errors.New("re-registration is required")
+)
+
+func (w *Worker) gotoStateAwake(ctx context.Context) {
+ w.stateMutex.Lock()
+ defer w.stateMutex.Unlock()
+
+ w.state = api.WorkerStatusAwake
+
+ w.doneWg.Add(2)
+ go w.ackStateChange(ctx, w.state)
+ go w.runStateAwake(ctx)
+}
+
+func (w *Worker) runStateAwake(ctx context.Context) {
+ defer w.doneWg.Done()
+ task := w.fetchTask(ctx)
+ if task == nil {
+ return
+ }
+
+ // TODO: actually execute the task
+ log.Error().Interface("task", *task).Msg("task execution not implemented yet")
+}
+
+// fetchTasks periodically tries to fetch a task from the Manager, returning it when obtained.
+// Returns nil when a task could not be obtained and the period loop was cancelled.
+func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
+ logger := log.With().Str("status", string(w.state)).Logger()
+ logger.Info().Msg("fetching tasks")
+
+ // Initially don't wait at all.
+ var wait time.Duration
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Debug().Msg("task fetching interrupted by context cancellation")
+ return nil
+ case <-w.doneChan:
+ logger.Debug().Msg("task fetching interrupted by shutdown")
+ return nil
+ case <-time.After(wait):
+ }
+ if !w.isState(api.WorkerStatusAwake) {
+ logger.Debug().Msg("task fetching interrupted by state change")
+ return nil
+ }
+
+ resp, err := w.client.ScheduleTaskWithResponse(ctx)
+ if err != nil {
+ log.Error().Err(err).Msg("error obtaining task")
+ }
+ switch {
+ case resp.JSON200 != nil:
+ log.Info().
+ Interface("task", resp.JSON200).
+ Msg("obtained task")
+ return resp.JSON200
+ case resp.JSON423 != nil:
+ log.Info().
+ Str("requestedStatus", string(resp.JSON423.StatusRequested)).
+ Msg("Manager requests status change")
+ w.changeState(ctx, resp.JSON423.StatusRequested)
+ return nil
+ case resp.JSON403 != nil:
+ log.Error().
+ Int("code", resp.StatusCode()).
+ Str("error", string(resp.JSON403.Message)).
+ Msg("access denied")
+ wait = durationFetchFailed
+ continue
+ case resp.StatusCode() == http.StatusNoContent:
+ log.Info().Msg("no task available")
+ wait = durationNoTask
+ continue
+ default:
+ log.Warn().
+ Int("code", resp.StatusCode()).
+ Str("error", string(resp.Body)).
+ Msg("unable to obtain task for unknown reason")
+ wait = durationFetchFailed
+ continue
+ }
+ }
+}
diff --git a/internal/worker/state_shutdown.go b/internal/worker/state_shutdown.go
new file mode 100644
index 00000000..5bbb8343
--- /dev/null
+++ b/internal/worker/state_shutdown.go
@@ -0,0 +1,49 @@
+package worker
+
+import (
+ "context"
+ "os"
+
+ "github.com/rs/zerolog/log"
+ "gitlab.com/blender/flamenco-ng-poc/pkg/api"
+)
+
+func (w *Worker) gotoStateShutdown(context.Context) {
+ w.stateMutex.Lock()
+ defer w.stateMutex.Unlock()
+
+ w.state = api.WorkerStatusShutdown
+
+ logger := log.With().Int("pid", os.Getpid()).Logger()
+ proc, err := os.FindProcess(os.Getpid())
+ if err != nil {
+ logger.Fatal().Err(err).Msg("unable to find our own process for clean shutdown")
+ }
+
+ logger.Warn().Msg("sending our own process an interrupt signal")
+ err = proc.Signal(os.Interrupt)
+ if err != nil {
+ logger.Fatal().Err(err).Msg("unable to find send interrupt signal to our own process")
+ }
+}
+
+// SignOff forces the worker in shutdown state and acknlowedges this to the Manager.
+// Does NOT actually peform a shutdown; is intended to be called while shutdown is in progress.
+func (w *Worker) SignOff(ctx context.Context) {
+ w.stateMutex.Lock()
+ w.state = api.WorkerStatusShutdown
+ logger := log.With().Str("state", string(w.state)).Logger()
+ w.stateMutex.Unlock()
+
+ logger.Info().Msg("signing off at Manager")
+
+ resp, err := w.client.SignOffWithResponse(ctx)
+ if err != nil {
+ logger.Error().Err(err).Msg("unable to sign off at Manager")
+ return
+ }
+ if resp.JSONDefault != nil {
+ logger.Error().Interface("error", resp.JSONDefault).Msg("error received when signing off at Manager")
+ return
+ }
+}
diff --git a/internal/worker/statemachine.go b/internal/worker/statemachine.go
new file mode 100644
index 00000000..db34cdef
--- /dev/null
+++ b/internal/worker/statemachine.go
@@ -0,0 +1,70 @@
+package worker
+
+import (
+ "context"
+
+ "github.com/rs/zerolog/log"
+ "gitlab.com/blender/flamenco-ng-poc/pkg/api"
+)
+
+func (w *Worker) setupStateMachine() {
+ w.stateStarters[api.WorkerStatusAsleep] = w.gotoStateAsleep
+ w.stateStarters[api.WorkerStatusAwake] = w.gotoStateAwake
+ w.stateStarters[api.WorkerStatusShutdown] = w.gotoStateShutdown
+}
+
+// Called whenever the Flamenco Manager has a change in current status for us.
+func (w *Worker) changeState(ctx context.Context, newState api.WorkerStatus) {
+ w.stateMutex.Lock()
+ logger := log.With().
+ Str("newState", string(newState)).
+ Str("curState", string(w.state)).
+ Logger()
+ w.stateMutex.Unlock()
+
+ logger.Info().Msg("state change")
+
+ starter, ok := w.stateStarters[newState]
+ if !ok {
+ logger.Warn().Interface("available", w.stateStarters).Msg("no state starter for this state, going to sleep instead")
+ starter = w.gotoStateAsleep
+ }
+ starter(ctx)
+}
+
+// Confirm that we're now in a certain state.
+//
+// This ACK can be given without a request from the server, for example to support
+// state changes originating from UNIX signals.
+//
+// The state is passed as string so that this function can run independently of
+// the current w.state (for thread-safety)
+func (w *Worker) ackStateChange(ctx context.Context, state api.WorkerStatus) {
+ defer w.doneWg.Done()
+
+ req := api.WorkerStateChangedJSONRequestBody{Status: state}
+
+ logger := log.With().Str("state", string(state)).Logger()
+ logger.Debug().Msg("notifying Manager of our state")
+
+ resp, err := w.client.WorkerStateChangedWithResponse(ctx, req)
+ if err != nil {
+ logger.Warn().Err(err).Msg("unable to notify Manager of status change")
+ return
+ }
+
+ // The 'default' response is for error cases.
+ if resp.JSONDefault != nil {
+ logger.Warn().
+ Str("httpCode", resp.HTTPResponse.Status).
+ Interface("error", resp.JSONDefault).
+ Msg("error sending status change to Manager")
+ return
+ }
+}
+
+func (w *Worker) isState(state api.WorkerStatus) bool {
+ w.stateMutex.Lock()
+ defer w.stateMutex.Unlock()
+ return w.state == state
+}
diff --git a/internal/worker/worker.go b/internal/worker/worker.go
index 7c649ccd..1d0fc8e5 100644
--- a/internal/worker/worker.go
+++ b/internal/worker/worker.go
@@ -3,21 +3,12 @@ package worker
import (
"context"
"errors"
- "net/url"
- "os"
"sync"
- "time"
"github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/pkg/api"
)
-const (
- requestRetry = 5 * time.Second
- credentialsFilename = "flamenco-worker-credentials.yaml"
- configFilename = "flamenco-worker.yaml"
-)
-
var (
errRequestAborted = errors.New("request to Manager aborted")
)
@@ -27,32 +18,22 @@ type Worker struct {
doneChan chan struct{}
doneWg *sync.WaitGroup
- manager *url.URL
- client api.ClientWithResponsesInterface
- creds *workerCredentials
+ client api.ClientWithResponsesInterface
state api.WorkerStatus
- stateStarters map[string]func() // gotoStateXXX functions
+ stateStarters map[api.WorkerStatus]StateStarter // gotoStateXXX functions
stateMutex *sync.Mutex
taskRunner TaskRunner
-
- configWrangler ConfigWrangler
- config WorkerConfig
- managerFinder ManagerFinder
}
-type ManagerFinder interface {
- FindFlamencoManager() <-chan *url.URL
-}
+type StateStarter func(context.Context)
type TaskRunner interface{}
// NewWorker constructs and returns a new Worker.
func NewWorker(
flamenco api.ClientWithResponsesInterface,
- configWrangler ConfigWrangler,
- managerFinder ManagerFinder,
taskRunner TaskRunner,
) *Worker {
@@ -63,77 +44,18 @@ func NewWorker(
client: flamenco,
state: api.WorkerStatusStarting,
- stateStarters: make(map[string]func()),
+ stateStarters: make(map[api.WorkerStatus]StateStarter),
stateMutex: new(sync.Mutex),
- // taskRunner: taskRunner,
-
- configWrangler: configWrangler,
- managerFinder: managerFinder,
+ taskRunner: taskRunner,
}
- // worker.setupStateMachine()
- worker.loadConfig()
+ worker.setupStateMachine()
return worker
}
-func (w *Worker) start(ctx context.Context, register bool) {
- w.doneWg.Add(1)
- defer w.doneWg.Done()
-
- w.loadCredentials()
-
- if w.creds == nil || register {
- w.register(ctx)
- }
-
- startState := w.signOn(ctx)
- log.Error().Str("state", string(startState)).Msg("here the road ends, nothing else is implemented")
- // w.changeState(startState)
-}
-
-func (w *Worker) loadCredentials() {
- log.Debug().Msg("loading credentials")
-
- w.creds = &workerCredentials{}
- err := w.configWrangler.LoadConfig(credentialsFilename, w.creds)
- if err != nil {
- log.Warn().Err(err).Str("file", credentialsFilename).
- Msg("unable to load credentials configuration file")
- w.creds = nil
- return
- }
-}
-
-func (w *Worker) loadConfig() {
- logger := log.With().Str("filename", configFilename).Logger()
- err := w.configWrangler.LoadConfig(configFilename, &w.config)
- if os.IsNotExist(err) {
- logger.Info().Msg("writing default configuration file")
- w.config = w.configWrangler.DefaultConfig()
- w.saveConfig()
- err = w.configWrangler.LoadConfig(configFilename, &w.config)
- }
- if err != nil {
- logger.Fatal().Err(err).Msg("unable to load config file")
- }
-
- if w.config.Manager != "" {
- w.manager, err = ParseURL(w.config.Manager)
- if err != nil {
- logger.Fatal().Err(err).Str("url", w.config.Manager).
- Msg("unable to parse manager URL")
- }
- logger.Debug().Str("url", w.config.Manager).Msg("parsed manager URL")
- }
-
-}
-
-func (w *Worker) saveConfig() {
- err := w.configWrangler.WriteConfig(configFilename, "Configuration", w.config)
- if err != nil {
- log.Warn().Err(err).Str("filename", configFilename).
- Msg("unable to write configuration file")
- }
+// Start starts the worker by sending it to the given state.
+func (w *Worker) Start(ctx context.Context, state api.WorkerStatus) {
+ w.changeState(ctx, state)
}
// Close gracefully shuts down the Worker.
diff --git a/pkg/api/flamenco-manager.yaml b/pkg/api/flamenco-manager.yaml
index ee61ca9d..2366d460 100644
--- a/pkg/api/flamenco-manager.yaml
+++ b/pkg/api/flamenco-manager.yaml
@@ -67,6 +67,69 @@ paths:
schema:
$ref: '#/components/schemas/Error'
+ /api/worker/sign-off:
+ summary: Called by Workers to let the Manager know they're going offline.
+ post:
+ summary: Mark the worker as offline
+ operationId: signOff
+ security: [{worker_auth: []}]
+ tags: [worker]
+ responses:
+ "204":
+ description: normal response
+ default:
+ description: unexpected error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Error'
+
+ /api/worker/state:
+ summary: Called by Workers to check whether there is any state change requested.
+ get:
+ operationId: workerState
+ security: [{worker_auth: []}]
+ tags: [worker]
+ responses:
+ "204":
+ description: no state change requested
+ "200":
+ description: state change requested
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/WorkerStateChange"
+ default:
+ description: unexpected error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Error'
+
+ /api/worker/state-changed:
+ summary: Called by Workers to let the Manager know they've changed status.
+ post:
+ summary: Worker changed state. This could be as acknowledgement of a Manager-requested state change, or in response to worker-local signals.
+ operationId: workerStateChanged
+ security: [{worker_auth: []}]
+ tags: [worker]
+ requestBody:
+ description: New worker state
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/WorkerStateChanged"
+ responses:
+ "204":
+ description: normal response
+ default:
+ description: unexpected error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Error'
+
/api/worker/task:
summary: Task scheduler endpoint.
post:
@@ -87,6 +150,12 @@ paths:
content:
application/json:
schema: {$ref: "#/components/schemas/SecurityError"}
+ "423":
+ description: Worker cannot obtain new tasks, but must go to another state.
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/WorkerStateChange"
/api/jobs/types:
summary: Available Flamenco job types.
@@ -180,7 +249,7 @@ components:
WorkerStatus:
type: string
- enum: [starting, awake, asleep, error, shutting-down, testing]
+ enum: [starting, awake, asleep, error, shutdown, testing, offline]
WorkerSignOn:
type: object
@@ -197,6 +266,12 @@ components:
status_requested: {$ref: "#/components/schemas/WorkerStatus"}
required: [status_requested]
+ WorkerStateChanged:
+ type: object
+ properties:
+ status: {$ref: "#/components/schemas/WorkerStatus"}
+ required: [status]
+
AssignedTask:
type: object
description: AssignedTask is a task as it is received by the Worker.
diff --git a/pkg/api/openapi_client.gen.go b/pkg/api/openapi_client.gen.go
index c458d94d..fcc03763 100644
--- a/pkg/api/openapi_client.gen.go
+++ b/pkg/api/openapi_client.gen.go
@@ -106,11 +106,22 @@ type ClientInterface interface {
RegisterWorker(ctx context.Context, body RegisterWorkerJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error)
+ // SignOff request
+ SignOff(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error)
+
// SignOn request with any body
SignOnWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error)
SignOn(ctx context.Context, body SignOnJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error)
+ // WorkerState request
+ WorkerState(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error)
+
+ // WorkerStateChanged request with any body
+ WorkerStateChangedWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error)
+
+ WorkerStateChanged(ctx context.Context, body WorkerStateChangedJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error)
+
// ScheduleTask request
ScheduleTask(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error)
}
@@ -187,6 +198,18 @@ func (c *Client) RegisterWorker(ctx context.Context, body RegisterWorkerJSONRequ
return c.Client.Do(req)
}
+func (c *Client) SignOff(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) {
+ req, err := NewSignOffRequest(c.Server)
+ if err != nil {
+ return nil, err
+ }
+ req = req.WithContext(ctx)
+ if err := c.applyEditors(ctx, req, reqEditors); err != nil {
+ return nil, err
+ }
+ return c.Client.Do(req)
+}
+
func (c *Client) SignOnWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) {
req, err := NewSignOnRequestWithBody(c.Server, contentType, body)
if err != nil {
@@ -211,6 +234,42 @@ func (c *Client) SignOn(ctx context.Context, body SignOnJSONRequestBody, reqEdit
return c.Client.Do(req)
}
+func (c *Client) WorkerState(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) {
+ req, err := NewWorkerStateRequest(c.Server)
+ if err != nil {
+ return nil, err
+ }
+ req = req.WithContext(ctx)
+ if err := c.applyEditors(ctx, req, reqEditors); err != nil {
+ return nil, err
+ }
+ return c.Client.Do(req)
+}
+
+func (c *Client) WorkerStateChangedWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) {
+ req, err := NewWorkerStateChangedRequestWithBody(c.Server, contentType, body)
+ if err != nil {
+ return nil, err
+ }
+ req = req.WithContext(ctx)
+ if err := c.applyEditors(ctx, req, reqEditors); err != nil {
+ return nil, err
+ }
+ return c.Client.Do(req)
+}
+
+func (c *Client) WorkerStateChanged(ctx context.Context, body WorkerStateChangedJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) {
+ req, err := NewWorkerStateChangedRequest(c.Server, body)
+ if err != nil {
+ return nil, err
+ }
+ req = req.WithContext(ctx)
+ if err := c.applyEditors(ctx, req, reqEditors); err != nil {
+ return nil, err
+ }
+ return c.Client.Do(req)
+}
+
func (c *Client) ScheduleTask(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) {
req, err := NewScheduleTaskRequest(c.Server)
if err != nil {
@@ -364,6 +423,33 @@ func NewRegisterWorkerRequestWithBody(server string, contentType string, body io
return req, nil
}
+// NewSignOffRequest generates requests for SignOff
+func NewSignOffRequest(server string) (*http.Request, error) {
+ var err error
+
+ serverURL, err := url.Parse(server)
+ if err != nil {
+ return nil, err
+ }
+
+ operationPath := fmt.Sprintf("/api/worker/sign-off")
+ if operationPath[0] == '/' {
+ operationPath = "." + operationPath
+ }
+
+ queryURL, err := serverURL.Parse(operationPath)
+ if err != nil {
+ return nil, err
+ }
+
+ req, err := http.NewRequest("POST", queryURL.String(), nil)
+ if err != nil {
+ return nil, err
+ }
+
+ return req, nil
+}
+
// NewSignOnRequest calls the generic SignOn builder with application/json body
func NewSignOnRequest(server string, body SignOnJSONRequestBody) (*http.Request, error) {
var bodyReader io.Reader
@@ -404,6 +490,73 @@ func NewSignOnRequestWithBody(server string, contentType string, body io.Reader)
return req, nil
}
+// NewWorkerStateRequest generates requests for WorkerState
+func NewWorkerStateRequest(server string) (*http.Request, error) {
+ var err error
+
+ serverURL, err := url.Parse(server)
+ if err != nil {
+ return nil, err
+ }
+
+ operationPath := fmt.Sprintf("/api/worker/state")
+ if operationPath[0] == '/' {
+ operationPath = "." + operationPath
+ }
+
+ queryURL, err := serverURL.Parse(operationPath)
+ if err != nil {
+ return nil, err
+ }
+
+ req, err := http.NewRequest("GET", queryURL.String(), nil)
+ if err != nil {
+ return nil, err
+ }
+
+ return req, nil
+}
+
+// NewWorkerStateChangedRequest calls the generic WorkerStateChanged builder with application/json body
+func NewWorkerStateChangedRequest(server string, body WorkerStateChangedJSONRequestBody) (*http.Request, error) {
+ var bodyReader io.Reader
+ buf, err := json.Marshal(body)
+ if err != nil {
+ return nil, err
+ }
+ bodyReader = bytes.NewReader(buf)
+ return NewWorkerStateChangedRequestWithBody(server, "application/json", bodyReader)
+}
+
+// NewWorkerStateChangedRequestWithBody generates requests for WorkerStateChanged with any type of body
+func NewWorkerStateChangedRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) {
+ var err error
+
+ serverURL, err := url.Parse(server)
+ if err != nil {
+ return nil, err
+ }
+
+ operationPath := fmt.Sprintf("/api/worker/state-changed")
+ if operationPath[0] == '/' {
+ operationPath = "." + operationPath
+ }
+
+ queryURL, err := serverURL.Parse(operationPath)
+ if err != nil {
+ return nil, err
+ }
+
+ req, err := http.NewRequest("POST", queryURL.String(), body)
+ if err != nil {
+ return nil, err
+ }
+
+ req.Header.Add("Content-Type", contentType)
+
+ return req, nil
+}
+
// NewScheduleTaskRequest generates requests for ScheduleTask
func NewScheduleTaskRequest(server string) (*http.Request, error) {
var err error
@@ -490,11 +643,22 @@ type ClientWithResponsesInterface interface {
RegisterWorkerWithResponse(ctx context.Context, body RegisterWorkerJSONRequestBody, reqEditors ...RequestEditorFn) (*RegisterWorkerResponse, error)
+ // SignOff request
+ SignOffWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*SignOffResponse, error)
+
// SignOn request with any body
SignOnWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*SignOnResponse, error)
SignOnWithResponse(ctx context.Context, body SignOnJSONRequestBody, reqEditors ...RequestEditorFn) (*SignOnResponse, error)
+ // WorkerState request
+ WorkerStateWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*WorkerStateResponse, error)
+
+ // WorkerStateChanged request with any body
+ WorkerStateChangedWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*WorkerStateChangedResponse, error)
+
+ WorkerStateChangedWithResponse(ctx context.Context, body WorkerStateChangedJSONRequestBody, reqEditors ...RequestEditorFn) (*WorkerStateChangedResponse, error)
+
// ScheduleTask request
ScheduleTaskWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*ScheduleTaskResponse, error)
}
@@ -589,6 +753,28 @@ func (r RegisterWorkerResponse) StatusCode() int {
return 0
}
+type SignOffResponse struct {
+ Body []byte
+ HTTPResponse *http.Response
+ JSONDefault *Error
+}
+
+// Status returns HTTPResponse.Status
+func (r SignOffResponse) Status() string {
+ if r.HTTPResponse != nil {
+ return r.HTTPResponse.Status
+ }
+ return http.StatusText(0)
+}
+
+// StatusCode returns HTTPResponse.StatusCode
+func (r SignOffResponse) StatusCode() int {
+ if r.HTTPResponse != nil {
+ return r.HTTPResponse.StatusCode
+ }
+ return 0
+}
+
type SignOnResponse struct {
Body []byte
HTTPResponse *http.Response
@@ -612,11 +798,57 @@ func (r SignOnResponse) StatusCode() int {
return 0
}
+type WorkerStateResponse struct {
+ Body []byte
+ HTTPResponse *http.Response
+ JSON200 *WorkerStateChange
+ JSONDefault *Error
+}
+
+// Status returns HTTPResponse.Status
+func (r WorkerStateResponse) Status() string {
+ if r.HTTPResponse != nil {
+ return r.HTTPResponse.Status
+ }
+ return http.StatusText(0)
+}
+
+// StatusCode returns HTTPResponse.StatusCode
+func (r WorkerStateResponse) StatusCode() int {
+ if r.HTTPResponse != nil {
+ return r.HTTPResponse.StatusCode
+ }
+ return 0
+}
+
+type WorkerStateChangedResponse struct {
+ Body []byte
+ HTTPResponse *http.Response
+ JSONDefault *Error
+}
+
+// Status returns HTTPResponse.Status
+func (r WorkerStateChangedResponse) Status() string {
+ if r.HTTPResponse != nil {
+ return r.HTTPResponse.Status
+ }
+ return http.StatusText(0)
+}
+
+// StatusCode returns HTTPResponse.StatusCode
+func (r WorkerStateChangedResponse) StatusCode() int {
+ if r.HTTPResponse != nil {
+ return r.HTTPResponse.StatusCode
+ }
+ return 0
+}
+
type ScheduleTaskResponse struct {
Body []byte
HTTPResponse *http.Response
JSON200 *AssignedTask
JSON403 *SecurityError
+ JSON423 *WorkerStateChange
}
// Status returns HTTPResponse.Status
@@ -687,6 +919,15 @@ func (c *ClientWithResponses) RegisterWorkerWithResponse(ctx context.Context, bo
return ParseRegisterWorkerResponse(rsp)
}
+// SignOffWithResponse request returning *SignOffResponse
+func (c *ClientWithResponses) SignOffWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*SignOffResponse, error) {
+ rsp, err := c.SignOff(ctx, reqEditors...)
+ if err != nil {
+ return nil, err
+ }
+ return ParseSignOffResponse(rsp)
+}
+
// SignOnWithBodyWithResponse request with arbitrary body returning *SignOnResponse
func (c *ClientWithResponses) SignOnWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*SignOnResponse, error) {
rsp, err := c.SignOnWithBody(ctx, contentType, body, reqEditors...)
@@ -704,6 +945,32 @@ func (c *ClientWithResponses) SignOnWithResponse(ctx context.Context, body SignO
return ParseSignOnResponse(rsp)
}
+// WorkerStateWithResponse request returning *WorkerStateResponse
+func (c *ClientWithResponses) WorkerStateWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*WorkerStateResponse, error) {
+ rsp, err := c.WorkerState(ctx, reqEditors...)
+ if err != nil {
+ return nil, err
+ }
+ return ParseWorkerStateResponse(rsp)
+}
+
+// WorkerStateChangedWithBodyWithResponse request with arbitrary body returning *WorkerStateChangedResponse
+func (c *ClientWithResponses) WorkerStateChangedWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*WorkerStateChangedResponse, error) {
+ rsp, err := c.WorkerStateChangedWithBody(ctx, contentType, body, reqEditors...)
+ if err != nil {
+ return nil, err
+ }
+ return ParseWorkerStateChangedResponse(rsp)
+}
+
+func (c *ClientWithResponses) WorkerStateChangedWithResponse(ctx context.Context, body WorkerStateChangedJSONRequestBody, reqEditors ...RequestEditorFn) (*WorkerStateChangedResponse, error) {
+ rsp, err := c.WorkerStateChanged(ctx, body, reqEditors...)
+ if err != nil {
+ return nil, err
+ }
+ return ParseWorkerStateChangedResponse(rsp)
+}
+
// ScheduleTaskWithResponse request returning *ScheduleTaskResponse
func (c *ClientWithResponses) ScheduleTaskWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*ScheduleTaskResponse, error) {
rsp, err := c.ScheduleTask(ctx, reqEditors...)
@@ -831,6 +1098,32 @@ func ParseRegisterWorkerResponse(rsp *http.Response) (*RegisterWorkerResponse, e
return response, nil
}
+// ParseSignOffResponse parses an HTTP response from a SignOffWithResponse call
+func ParseSignOffResponse(rsp *http.Response) (*SignOffResponse, error) {
+ bodyBytes, err := ioutil.ReadAll(rsp.Body)
+ defer func() { _ = rsp.Body.Close() }()
+ if err != nil {
+ return nil, err
+ }
+
+ response := &SignOffResponse{
+ Body: bodyBytes,
+ HTTPResponse: rsp,
+ }
+
+ switch {
+ case strings.Contains(rsp.Header.Get("Content-Type"), "json") && true:
+ var dest Error
+ if err := json.Unmarshal(bodyBytes, &dest); err != nil {
+ return nil, err
+ }
+ response.JSONDefault = &dest
+
+ }
+
+ return response, nil
+}
+
// ParseSignOnResponse parses an HTTP response from a SignOnWithResponse call
func ParseSignOnResponse(rsp *http.Response) (*SignOnResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
@@ -864,6 +1157,65 @@ func ParseSignOnResponse(rsp *http.Response) (*SignOnResponse, error) {
return response, nil
}
+// ParseWorkerStateResponse parses an HTTP response from a WorkerStateWithResponse call
+func ParseWorkerStateResponse(rsp *http.Response) (*WorkerStateResponse, error) {
+ bodyBytes, err := ioutil.ReadAll(rsp.Body)
+ defer func() { _ = rsp.Body.Close() }()
+ if err != nil {
+ return nil, err
+ }
+
+ response := &WorkerStateResponse{
+ Body: bodyBytes,
+ HTTPResponse: rsp,
+ }
+
+ switch {
+ case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200:
+ var dest WorkerStateChange
+ if err := json.Unmarshal(bodyBytes, &dest); err != nil {
+ return nil, err
+ }
+ response.JSON200 = &dest
+
+ case strings.Contains(rsp.Header.Get("Content-Type"), "json") && true:
+ var dest Error
+ if err := json.Unmarshal(bodyBytes, &dest); err != nil {
+ return nil, err
+ }
+ response.JSONDefault = &dest
+
+ }
+
+ return response, nil
+}
+
+// ParseWorkerStateChangedResponse parses an HTTP response from a WorkerStateChangedWithResponse call
+func ParseWorkerStateChangedResponse(rsp *http.Response) (*WorkerStateChangedResponse, error) {
+ bodyBytes, err := ioutil.ReadAll(rsp.Body)
+ defer func() { _ = rsp.Body.Close() }()
+ if err != nil {
+ return nil, err
+ }
+
+ response := &WorkerStateChangedResponse{
+ Body: bodyBytes,
+ HTTPResponse: rsp,
+ }
+
+ switch {
+ case strings.Contains(rsp.Header.Get("Content-Type"), "json") && true:
+ var dest Error
+ if err := json.Unmarshal(bodyBytes, &dest); err != nil {
+ return nil, err
+ }
+ response.JSONDefault = &dest
+
+ }
+
+ return response, nil
+}
+
// ParseScheduleTaskResponse parses an HTTP response from a ScheduleTaskWithResponse call
func ParseScheduleTaskResponse(rsp *http.Response) (*ScheduleTaskResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
@@ -892,6 +1244,13 @@ func ParseScheduleTaskResponse(rsp *http.Response) (*ScheduleTaskResponse, error
}
response.JSON403 = &dest
+ case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 423:
+ var dest WorkerStateChange
+ if err := json.Unmarshal(bodyBytes, &dest); err != nil {
+ return nil, err
+ }
+ response.JSON423 = &dest
+
}
return response, nil
diff --git a/pkg/api/openapi_server.gen.go b/pkg/api/openapi_server.gen.go
index d8606dd4..1e3e0c94 100644
--- a/pkg/api/openapi_server.gen.go
+++ b/pkg/api/openapi_server.gen.go
@@ -25,9 +25,18 @@ type ServerInterface interface {
// Register a new worker
// (POST /api/worker/register-worker)
RegisterWorker(ctx echo.Context) error
+ // Mark the worker as offline
+ // (POST /api/worker/sign-off)
+ SignOff(ctx echo.Context) error
// Authenticate & sign in the worker.
// (POST /api/worker/sign-on)
SignOn(ctx echo.Context) error
+
+ // (GET /api/worker/state)
+ WorkerState(ctx echo.Context) error
+ // Worker changed state. This could be as acknowledgement of a Manager-requested state change, or in response to worker-local signals.
+ // (POST /api/worker/state-changed)
+ WorkerStateChanged(ctx echo.Context) error
// Obtain a new task to execute
// (POST /api/worker/task)
ScheduleTask(ctx echo.Context) error
@@ -81,6 +90,17 @@ func (w *ServerInterfaceWrapper) RegisterWorker(ctx echo.Context) error {
return err
}
+// SignOff converts echo context to params.
+func (w *ServerInterfaceWrapper) SignOff(ctx echo.Context) error {
+ var err error
+
+ ctx.Set(Worker_authScopes, []string{""})
+
+ // Invoke the callback with all the unmarshalled arguments
+ err = w.Handler.SignOff(ctx)
+ return err
+}
+
// SignOn converts echo context to params.
func (w *ServerInterfaceWrapper) SignOn(ctx echo.Context) error {
var err error
@@ -92,6 +112,28 @@ func (w *ServerInterfaceWrapper) SignOn(ctx echo.Context) error {
return err
}
+// WorkerState converts echo context to params.
+func (w *ServerInterfaceWrapper) WorkerState(ctx echo.Context) error {
+ var err error
+
+ ctx.Set(Worker_authScopes, []string{""})
+
+ // Invoke the callback with all the unmarshalled arguments
+ err = w.Handler.WorkerState(ctx)
+ return err
+}
+
+// WorkerStateChanged converts echo context to params.
+func (w *ServerInterfaceWrapper) WorkerStateChanged(ctx echo.Context) error {
+ var err error
+
+ ctx.Set(Worker_authScopes, []string{""})
+
+ // Invoke the callback with all the unmarshalled arguments
+ err = w.Handler.WorkerStateChanged(ctx)
+ return err
+}
+
// ScheduleTask converts echo context to params.
func (w *ServerInterfaceWrapper) ScheduleTask(ctx echo.Context) error {
var err error
@@ -135,7 +177,10 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL
router.GET(baseURL+"/api/jobs/types", wrapper.GetJobTypes)
router.GET(baseURL+"/api/jobs/:job_id", wrapper.FetchJob)
router.POST(baseURL+"/api/worker/register-worker", wrapper.RegisterWorker)
+ router.POST(baseURL+"/api/worker/sign-off", wrapper.SignOff)
router.POST(baseURL+"/api/worker/sign-on", wrapper.SignOn)
+ router.GET(baseURL+"/api/worker/state", wrapper.WorkerState)
+ router.POST(baseURL+"/api/worker/state-changed", wrapper.WorkerStateChanged)
router.POST(baseURL+"/api/worker/task", wrapper.ScheduleTask)
}
diff --git a/pkg/api/openapi_spec.gen.go b/pkg/api/openapi_spec.gen.go
index d1e47dc7..4d1ccc8b 100644
--- a/pkg/api/openapi_spec.gen.go
+++ b/pkg/api/openapi_spec.gen.go
@@ -18,53 +18,57 @@ import (
// Base64 encoded, gzipped, json marshaled Swagger object
var swaggerSpec = []string{
- "H4sIAAAAAAAC/9Ra3W4ct/V/FWLyB5LgP7srW2kv9qqOHTsykljIKshFLKzODs/uUOKQE5Kj9dYQkIfo",
- "m7QBetFc9QWUNyoOyfnc0VdjGWgQGKMheXg+fudz9n2S6aLUCpWzyfx9YrMcC/CPz6wVG4X8BOwF/c3R",
- "ZkaUTmiVzHurTFgGzNETWCYc/W0wQ3GJnK12zOXIftTmAs00SZPS6BKNE+hvyXRRgOL+WTgs/MP/GVwn",
- "8+STWcvcLHI2ex4OJFdp4nYlJvMEjIEd/X2uV3Q6vrbOCLWJ75elEdoIt+tsEMrhBk29I7wdOa6gGF+4",
- "naZ14Ko7xSH9LcJOkgjsxc2MVBbN+EIlOC2stSnAJfPwIh1uvEoTgz9XwiBP5j/Vm0hrkXaUteG9I+JA",
- "ix2VdblOW3ueNtfr1Tlmjvh8dglCwkria71aoHPE1R6yFkJtJDIb1pleM2Cv9YoRNTsCoFyLLDz26fyY",
- "o2IbcYkqZVIUwnkcXoIUnP6t0DKn6Z1FFolM2Rsld6yyxCPbCpezoDt/Od3dQHTPBkMwclxDJd0+Xyc5",
- "srgY+GA211sVmWFkCLYl3jk6NIVQ/v5c2FolUyKPXDjiMtCPV61BWkz39eByNEQfpNRbRkeHNBmsHe3J",
- "kZ3rFcvBshWiYrZaFcI55FP2o64kZ6Io5Y5xlBiOScnwnbCBINgLy9baBNLnepUyUJxigS5KIWmPcNO3",
- "qoXmSmuJoEiiC9ztK+uIo3JiLdBEug0wUlZU1rEVskqJn6tgLqEaEWqL7RmqdYEHaE4UBXIBDuWOGSQ8",
- "M/DXcFwLJehASlD1gtOVqedHVy68KsE4kVUSTGPFG9Rgq1UdAG6LGyOutIgnGzA+mMJJPH4prBhiy5nq",
- "NgURhvuIirb44Si4MCmrRpNhn0lxgQzYlxIVR8OA84lWn0/ZAh2RO/MGOQuOEDIKKEbR1SiQzR0uB0dX",
- "V5KrTz0YGl9Cxb0v2XFFD2IhgS9uumfgWrR2GsSvajWhlQCHAMba5ux5ZQwqJ3dMU6SBmq5HdyfW2Ck7",
- "+/rZ4uuvXixfHn3z1fL42cnXZyHPcmEwc9rsWAkuZ//Pzt4ms0/8f2+TMwZlSSrlQWxUVUHyrYXEJe1P",
- "0oQLUz/61zHm52Bz5Mt25+mI89wEmv0oFzXQkb7jsSHAgmVHL45DNN95sQk0ERJT9p1mCq1DToqpMlcZ",
- "tOwzH2BtyrjI6CowAu3nDAwyW5WlNm4oemQ+pdx8+JSElhpcknos3CnkuHR1PmrvDHWOsOxbULBBEyKf",
- "cN71oaBQPpK8JKxQPqzoiMq8f8E0lnT38tXAHSIkAnudO+/yDdLWSCr+RlhXg8Gj+2a97euoLjT+O4lP",
- "ehHxBnHbK8YErCvOPbHiAjNYGrTEAgNmQ/kS6yAfid5hVjm8qxK+l8UHzI2b7VZzfWWM9lXksA7n2Csh",
- "a2/ZL2wLtBY2Y7wO2PE02/1j3LwOJTtI+WadzH+63a6LuhihU1fpnggGweGYnWhBaMWcKNA6KEqKArWg",
- "HBxOaGWsWBAj5H744ehFHdxf++L5jrr7vr0AOWjTClQl/8DSDKzjOa111t7XMHt6dRoM9C064ODAG4pz",
- "X+yAPO7pfk/iQbdoVsIZMDtWRGIx2dkp+1Yb7y6lxHfdSJ+BolxRaCo2fZyoyLfYGUxX0+yMKe2CHurC",
- "8AJ35FX4DohWhLgH2jxZlEY4ZC+N2OQutjtTLEBI4nq3Mqj+soqJR5tNvSP4ZLLwG9jC/ftflyg74aQH",
- "5EXHT8f1FGqo0bMNQOq0BZkTl76jApWRBkJzVUp08VkFZQmtJmsQYUfzUEJl/cPPFVb+AUyWU0fePIas",
- "GMhPCBk+2UYivRf+OVCpSEWT7uVJmmzBdxSTtTYTqh/saFr9HjfCOjTIQwjcD0LAuUE7DigJ1i29Uvod",
- "dydliuzi5l5dgiMnGY+weu22YG4Iv/fy3SBS675Ngls23XE/gd3ZQP6hpr7RRdootdvV18pIkywUpJ7L",
- "ZKjljmZukGgspi8wq4xwuxsyzb3Tx215o5cKRsuztjFrm1jKxi8lFKgyPQgVRSfIPV7YiAuH139jv/9y",
- "/ev1b9f/uP7191+u/3n92/Xfu+OW+Z8O+ok/3rLMCp7Mk/fxzyuyYF6pi6UVf8VkfkgyOQOZW0LFha5D",
- "Djmlr+nnycz4kzO7np3rFQEYFT55ejj1JLup5Pi7V/RnaZP50y/SZE1lrE3myZPJkwMqpwvYoF1qs7wU",
- "HDXVCP5Nkia6cmXlQiuB7xwqG+wyLX3ICRwsw64+S+GShqmOX1hBpppEwSfhSJjC9dHV2vGOXNvktfvO",
- "+JpemIwzMvDrmOuuNF9v7fTqtztDdOY4ZWu4GvONzkjxAfmkyRxNqCffbzPLffJEk3RKozO0lK5HM0EI",
- "liEfGAhOOwwTfyCaY2bQjS/9wag8MEq8qRdQR6/oBOQxi8XkITbqzUM18YEl6uSNe8f7NvXh8xzUBvdF",
- "CJln2WLlQdl0qPUhsduZ6ruBdWBcqHxgCxc+QVqJSEU0+oSVJjavvHtOuN762RH6+eYIlAPafLZbEOtB",
- "2q2/ewkVBdy97sGiIQ0zYX3VGjazoxcpK8HarTa8XgrwCjNsBq7eajp+Q1nMK80Pt8CKrK0QcufK5Ip4",
- "FGqtQ5OnHGSu7TaTOhuyEwRCb2VkPGnns9m6zpVCz/aL+u/D6O4lmIIVoXtnz46PqIoQGSqLnXteHX9z",
- "ebhHf7vdTjeqotQ5i2fsbFPKyeH0YIpqmrsiVNvCyR638bokTS7RxOTyZHowPaDdukQFpaA8619RXHK5",
- "t8wMSuHTnoeotl4VBFSvzCMexneFcKGvixD7UvNdrT5U/gyUpRSZPzU7tyF+BfDeBe1+E3u1p1U/WtKx",
- "Zkm6yKdU7l3Blpo0RTc9PTj4qJxtwTJbZRnadSXljoXBPnImlNNMKC4uBa9Ahm8B08GHkA/CZigrR/jz",
- "C6yuGr1vVkUBZtdYlQFTuPVjKGoaGzjF2VNnWOO/HADlcj8dom64S+51Pcy2BD6GipdaKOflbTA2a4Lx",
- "BkeA9gpdMzF7RKvuj+dGVNdsakd0AwW+Qsfk3hjPT7hyFGYw5bxFde1VjfrP2697Pf29P9erpeBXN6rw",
- "JbosD67a3u/HSIKkikPuGIICsT2PSjt6vKvVOn1EO93idD58983hJfcLDFbhK5O33T1wGw4pHoNoQZzX",
- "ag8ZZmZipz7Zto36aLCsW/rY0D9OxBypFkcUFXaRC9fcf9TguTfcGGFREbwkq3n4qMGxUviuxIyaYIx7",
- "usCo2Y8Rclvbs8ZSfHE6ciiYhOJCe9IOEWXFRk1imT+edkPx+5gIilfcjJ2md/yYwNmvnv8nkBOrXh9s",
- "e/XuT6cUJjvxvnI5KkdMIXtbHRw8/TMjNNTfh7fN55BbsfYcZJwCB4X5H29IDIGvzt8Xyn+Ext2nBplB",
- "4DvaRfTC5zinWRhzx6RVG3w6hKuLv3m6AatZjrySeBJmZo+Xuru/wBqxjP/tVbdmuUqTpwdf7Pcc3+n4",
- "24z+92b/3a3+HHWVJl8cHH64YrI3BBxh/hhNXT69QCWQPxBXb1YOhIrxyvU1cReYvOJstKLplm+eBXNZ",
- "lxGhXZkldHWkONTtl6RP+t+r1P/igDjZoPN1TfO9YwVyJaFXjlj/EWtQiR0f9WvTDtwzXRSVCp7kf5w0",
- "LGCnLfko99Xp1X8CAAD//yqfdC3jJwAA",
+ "H4sIAAAAAAAC/9xa224ct/l/FWLyB5LgP7srW24v9qqOHTsyYlvIKshFLKw4w293KHHICcnRemsIyEP0",
+ "TdoAvWiu+gLKGxUfyTnPaqVactMahjA7PH3H33cYfohSlRdKgrQmmn+ITJpBTt3jU2P4WgI7oeYCfzMw",
+ "qeaF5UpG884o4YZQYvGJGsIt/taQAr8ERpItsRmQH5S+AD2N4qjQqgBtObhTUpXnVDL3zC3k7uH/NKyi",
+ "efTZrCFuFiibPfMLoqs4stsConlEtaZb/H2uElwdXhuruVyH98tCc6W53bYmcGlhDbqa4d+OLJc0Hx+4",
+ "eU9jqS33soPyW/iZyBE1F7sJKQ3o8YGSMxxYKZ1TG839i7g/8SqONPxUcg0smv9YTUKphb0DrzXtLRZ7",
+ "UmyJrE113OjztD5eJeeQWqTz6SXlgiYCXqlkAdYiVQPLWnC5FkCMHydqRSh5pRKCu5kRA8oUT/1jd58f",
+ "MpBkzS9BxkTwnFtnh5dUcIZ/SzDEKnxngIRNpuStFFtSGqSRbLjNiJedOxzPrk10oIO+MTJY0VLYIV0n",
+ "GZAw6OkgJlMbGYghqAiyQdoZWNA5l+78jJtKJFPcHhi3SKXfPxy1osJAPJSDzUDj/lQItSG4tL8noSuL",
+ "czIg5yohGTUkAZDElEnOrQU2JT+oUjDC80JsCQMBfpkQBN5z4zek5sKQldJ+63OVxIRKhlig8oILnMPt",
+ "9J1sTDNRSgCVyNEFbIfCOmIgLV9x0GHf2jBikpfGkgRIKflPpVcXlzULlcYGimpc4A6S43kOjFMLYks0",
+ "oD0T6o5hsOKS44IYTdUxjkfGjh5VWv+qoNrytBRU11rcIQZTJhUA3IQbI660CCtrY7zzDidh+SU3vG9b",
+ "Vpc3CQhtuGtRQRffH3kXRmFV1qTJF4JfAKHkKwGSgSaUsYmSX07JAixud+YUcuYdwUcUKgmiq5ZU1GfY",
+ "jFo8uhRMfu6MofYlkMz5khkXdA8L0fjCpFsC16LRUw+/ymSCI94cvDFWOifPSq1BWrElCpGGVvs6625h",
+ "jZmSs2+eLr75+vnyxdG3Xy+Pn558c+bjLOMaUqv0lhTUZuT/ydm7aPaZ+/cuOiO0KFCkzLMNssyRvxUX",
+ "sMT5URwxrqtH9zpgfkZNBmzZzDwdcZ5dRjNEuSCBFvctj/UASw05en7s0Xzr2EajCSYxJW8UkWAsMBRM",
+ "mdpSgyFfOIA1MWE8xaOo5mC+JFQDMWVRKG37rAfiY4zNh4+RaaGojWJnC3uZHOeuikfNmT7P4Ya8ppKu",
+ "QXvk49a5Ps0RykeCl6AJiLslHUGYt0+YxoLuIF713CGYhCevdeY+30BpjYTib7mxlTE4694tt6GMqkTj",
+ "3+P4pIOIO9htjhhjsMo4B2yFAaKh0GCQBEKJ8elLyIMcEr2HtLSwLxO+lcZ7xI2r7UZ1fa21cllkPw9n",
+ "0EkhK28ZJrY5GEPXY7T2yHF7NvPHqHnlU3YqxNtVNP/xZr0uqmQEV13FAxY0UAtjesIBriSxPAdjaV4g",
+ "ClSMMmphgiNjyQIf2e7774+eV+D+yiXPe/Lu29YC6KB1KVAW7J656WnHUVrJrDmvJvb06tQr6DVYyqil",
+ "TlGMuWSHiuOO7Acc96pFnXCrqd6SPGwWgp2ZktdKO3cpBLxvI31KJcaKXGGy6XCiRN8iZ3SaTNMzIpX1",
+ "cqgSwwvYolfBe4p7BRN3hjaPFoXmFsgLzdeZDeXOFHLKBVK9TTTIPyUh8Ci9rmZ4n4wWbgJZ2H/+4xJE",
+ "C046hrxo+em4nHwONbq2NpAqbNHU8ktXUVGZogR8cVUIsOFZemFxJScryv2M+qGgpXEPP5VQugeq0wwr",
+ "8vrRR0W//QQtwwXbsEnnhXv2u5Qookn78CiONtRVFJOV0hPMH8xoWP0O1txY0MA8BA5BiDKmwYwblKDG",
+ "Lp1QuhV3K2Ty9GJ3rS6oRScZR1i1shuqd8DvrXzXs9S4bx3glnV13A1gewvIjyrqa1nEtVDbVX0ljDhK",
+ "fULqqIz6Um5JZgdHY5i+gLTU3G53RJpbh4+b4kYnFIymZ01h1hSxGI1fCJqDTFUPKvIWyD0cbISBw+u/",
+ "kN9+vv7l+tfrv13/8tvP13+//vX6r+12y/wPB93AH05ZpjmL5tGH8PMKNZiV8mJp+J8hmh8iT1bT1C5p",
+ "ybiqIAed0uX082im3cqZWc3OVYIGDBIePT6cui3boeT4zUv8WZho/vhJHK0wjTXRPHo0eXSA6XRO12CW",
+ "Si8vOQOFOYJ7E8WRKm1RWl9KwHsL0ni9TAsHOZ6CpZ/VJckfUhPV8gvDUVWTwPjEL/FduK51NXrcE2vr",
+ "uHbbHl9dC6NyRhp+LXXtC/PV1FatfrMzBGcOXbaaqjHfaLUU7xBP6shRQz36fhNZbhMn6qBTaJWCwXA9",
+ "Ggk8WPp4oKl32j5MfASaQ6rBjg99JCr3lBJO6gDq6BEtQB7TWAgefC3f3lUS98xRK27cGu+b0AfPMirX",
+ "MGTBR55lYyt3iqZ9qfc3uxVRbBdV90DLHgq6jmgs1dbnXnRDL1yINgIA03hwITOOTFZapjaucQUmzFar",
+ "leASRjzKG70Luguk2rO3cQQsaYm4PyhiDGhUNOHGJc9+Mjl6HpOCGrNRmlVD3sp9K51QW03VLffFYOrk",
+ "5Xps1PC0SVQya4voCmnkcqV8rSktTW1T9EZVUCYnQNGJSi3CSjOfzVZVyOZqNqwtvvMdxBdU5yT3TQTy",
+ "9PgIkxmegjTQOufl8beXh4P9N5vNdC1LjOCzsMbM1oWYHE4PpiCnmc190s+t6FAbjovi6BJ0iHGPpgfT",
+ "A5ytCpC04Bju3SuER5s5zcxowV30dTapjBMFWqYT5hHzXcScW19eBkv/SrFtJT6Qbg0tCsFTt2p2bjyM",
+ "ervdZ9XdWvpqIFXX4VIhdYraRo8ZhfMCUyiUFJ70+ODgk1K2oYaYMk3BrEohtsR/XwBGuLSKcMn4JWcl",
+ "Ff6TxLT3PeZeyPTZ7Qh9boBUyavzzTLPqd7WWiWUSNi4bhjWrrU5hRZYq2fkPmBQTClckwqL8vZ2r6qe",
+ "ukHjIyBZobi0jt/axmZ1TFjDiKG9BFs37h5Qq8Mu4Yjo6klNp7AnwJdgiRh0E12jLQOue83WG0TXHFWL",
+ "/7z5yNiR34dzlSw5u9opwhdg08y7anO+62Zx5Cr02gME+c0GHhW35Liv4jt9QD3d4HQOvrvqcJy7AUIT",
+ "/7HL6e4WdusXSRZANEfKK7H7CDPToWEw2TT9glGwrDoLoa/wMIg5krSOCMrPQheuqP+k4DnosYyQKNG8",
+ "BKlo+KTgWEp4X0CKtTiEOW3DqMgPCLmp9FnZUnhxOrLIqwRxoVlp+hZl+FpO1Gp1Q9zFJHy1GkLhk2EO",
+ "9fsTZEgCHfZ00r8fTxE1Gpm9pvqinfdRQ6r0co+0n1ER2rHewtwtCgHe9asIdiHd12DYfq6BrJW/x+G2",
+ "n46rRO7RiHxQpw5H7HbnuqvwKX15WFf9VzjzrW3waWkzkBaJAvKuPDh4/EeC1lDdHNjUH8ru2SA1ULbF",
+ "Wbif/1BrFfEfQEIeUSl8aK6ojp2ZQEtl0X/aMhylJHXjpCmWr+JdYEZ2r/h9m9TdzSPNIL0gm+r6SgYa",
+ "/BWT7Q4hjNvBJG21FkbBa6QN8aBA1j5oRLxv6tDo+bwFnv1vxb2A50FvXghTcpJxQ1J3vy1x11JoioAh",
+ "gPnE1N9gCljSdDw7thITpRG5KqlU+AJ6IlRKhYM2Ksx949kldLgpzcBUbbjAuyO8phmwUsCJ/wD0cAVg",
+ "+zrxiGLdReJ25bsLqN6ocNGwe3nKXSKp7lZcxdGTg8P7a0l0vmiNEH8MuirCn4PkHjSfPD78tIhfGTeV",
+ "UlmiEku5dNmwk1dMktL6K1hr5W6DSuXgzzvBHR3prd+d1vu3dLfPwp2qTbA73W5bOBL0ZVU++zbdLGqB",
+ "fN8avkILwP/OCNyFP6RkDdbV8/V1g4SKRNBOGW7cHZJeB+L4qNuTaQcNleel9OmKuxvcb9xMm+0D31en",
+ "V/8KAAD//8XqVAxiLwAA",
}
// GetSwagger returns the content of the embedded swagger specification file
diff --git a/pkg/api/openapi_types.gen.go b/pkg/api/openapi_types.gen.go
index 176963df..d0ea72e4 100644
--- a/pkg/api/openapi_types.gen.go
+++ b/pkg/api/openapi_types.gen.go
@@ -97,7 +97,9 @@ const (
WorkerStatusError WorkerStatus = "error"
- WorkerStatusShuttingDown WorkerStatus = "shutting-down"
+ WorkerStatusOffline WorkerStatus = "offline"
+
+ WorkerStatusShutdown WorkerStatus = "shutdown"
WorkerStatusStarting WorkerStatus = "starting"
@@ -253,6 +255,11 @@ type WorkerStateChange struct {
StatusRequested WorkerStatus `json:"status_requested"`
}
+// WorkerStateChanged defines model for WorkerStateChanged.
+type WorkerStateChanged struct {
+ Status WorkerStatus `json:"status"`
+}
+
// WorkerStatus defines model for WorkerStatus.
type WorkerStatus string
@@ -265,6 +272,9 @@ type RegisterWorkerJSONBody WorkerRegistration
// SignOnJSONBody defines parameters for SignOn.
type SignOnJSONBody WorkerSignOn
+// WorkerStateChangedJSONBody defines parameters for WorkerStateChanged.
+type WorkerStateChangedJSONBody WorkerStateChanged
+
// SubmitJobJSONRequestBody defines body for SubmitJob for application/json ContentType.
type SubmitJobJSONRequestBody SubmitJobJSONBody
@@ -274,6 +284,9 @@ type RegisterWorkerJSONRequestBody RegisterWorkerJSONBody
// SignOnJSONRequestBody defines body for SignOn for application/json ContentType.
type SignOnJSONRequestBody SignOnJSONBody
+// WorkerStateChangedJSONRequestBody defines body for WorkerStateChanged for application/json ContentType.
+type WorkerStateChangedJSONRequestBody WorkerStateChangedJSONBody
+
// Getter for additional properties for JobMetadata. Returns the specified
// element and whether it was found
func (a JobMetadata) Get(fieldName string) (value string, found bool) {