From 3e72391cbf07d77bcfdede49a080e5f5b957ccea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 14 Aug 2023 15:00:32 +0200 Subject: [PATCH] Restartable workers When the worker is started with `-restart-exit-code 47` or has `restart_exit_code=47` in `flamenco-worker.yaml`, it's marked as 'restartable'. This will enable two worker actions 'Restart (immediately)' and 'Restart (after task is finished)' in the Manager web interface. When a worker is asked to restart, it will exit with exit code `47`. Of course any positive exit code can be used here. --- cmd/flamenco-worker/main.go | 38 +++++++++++++++---- internal/manager/api_impl/worker_mgt.go | 16 ++++++-- internal/manager/api_impl/workers.go | 10 ++++- internal/manager/persistence/workers.go | 1 + internal/manager/webupdates/worker_updates.go | 11 +++--- internal/worker/config.go | 7 +++- internal/worker/registration.go | 2 + internal/worker/state_offline.go | 7 ++++ internal/worker/state_restart.go | 17 +++++++++ internal/worker/statemachine.go | 1 + internal/worker/worker.go | 23 ++++++++--- .../components/workers/WorkerActionsBar.vue | 29 +++++++++++++- .../src/components/workers/WorkerDetails.vue | 5 +++ web/app/src/stores/workers.js | 7 ++++ 14 files changed, 148 insertions(+), 26 deletions(-) create mode 100644 internal/worker/state_restart.go diff --git a/cmd/flamenco-worker/main.go b/cmd/flamenco-worker/main.go index f492be7c..e6ce3e7c 100644 --- a/cmd/flamenco-worker/main.go +++ b/cmd/flamenco-worker/main.go @@ -47,6 +47,8 @@ var cliArgs struct { manager string register bool + + restartExitCode int } func main() { @@ -84,17 +86,22 @@ func main() { // Load configuration, and override things from the CLI arguments if necessary. configWrangler := worker.NewConfigWrangler() + + // Before the config can be overridden, it has to be loaded. + if _, err := configWrangler.WorkerConfig(); err != nil { + log.Fatal().Err(err).Msg("error loading worker configuration") + } + if cliArgs.managerURL != nil { url := cliArgs.managerURL.String() log.Info().Str("manager", url).Msg("using Manager URL from commandline") - - // Before the config can be overridden, it has to be loaded. - if _, err := configWrangler.WorkerConfig(); err != nil { - log.Fatal().Err(err).Msg("error loading worker configuration") - } - configWrangler.SetManagerURL(url) } + if cliArgs.restartExitCode != 0 { + log.Info().Int("exitCode", cliArgs.restartExitCode). + Msg("will tell Manager this Worker can restart") + configWrangler.SetRestartExitCode(cliArgs.restartExitCode) + } findBlender() findFFmpeg() @@ -163,7 +170,8 @@ func main() { go w.Start(workerCtx, startupState) - if w.WaitForShutdown(workerCtx) { + shutdownReason := w.WaitForShutdown(workerCtx) + if shutdownReason != worker.ReasonContextClosed { go shutdown() } <-shutdownComplete @@ -172,6 +180,8 @@ func main() { wg.Wait() log.Debug().Msg("process shutting down") + config, _ := configWrangler.WorkerConfig() + stopProcess(config, shutdownReason) } func shutdown() { @@ -203,6 +213,17 @@ func shutdown() { close(shutdownComplete) } +func stopProcess(config worker.WorkerConfig, shutdownReason worker.ShutdownReason) { + switch shutdownReason { + case worker.ReasonContextClosed: + os.Exit(1) + case worker.ReasonShutdownReq: + os.Exit(0) + case worker.ReasonRestartReq: + os.Exit(config.RestartExitCode) + } +} + func parseCliArgs() { flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.") flag.BoolVar(&cliArgs.flush, "flush", false, "Flush any buffered task updates to the Manager, then exits.") @@ -216,6 +237,9 @@ func parseCliArgs() { flag.BoolVar(&cliArgs.register, "register", false, "(Re-)register at the Manager.") flag.BoolVar(&cliArgs.findManager, "find-manager", false, "Autodiscover a Manager, then quit.") + flag.IntVar(&cliArgs.restartExitCode, "restart-exit-code", 0, + "Mark this Worker as restartable. It will exit with this code to signify it needs to be restarted.") + flag.Parse() if cliArgs.manager != "" { diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index f8a9db0b..8a2307fe 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -159,6 +159,13 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string) Str("requested", string(change.Status)). Bool("lazy", change.IsLazy). Logger() + + if change.Status == api.WorkerStatusRestart && !dbWorker.CanRestart { + logger.Error().Msg("worker cannot be restarted, rejecting status change request") + return sendAPIError(e, http.StatusPreconditionFailed, + "worker %q does not know how to restart", workerUUID) + } + logger.Info().Msg("worker status change requested") if dbWorker.Status == change.Status { @@ -380,10 +387,11 @@ func (f *Flamenco) CreateWorkerTag(e echo.Context) error { func workerSummary(w persistence.Worker) api.WorkerSummary { summary := api.WorkerSummary{ - Id: w.UUID, - Name: w.Name, - Status: w.Status, - Version: w.Software, + Id: w.UUID, + Name: w.Name, + Status: w.Status, + Version: w.Software, + CanRestart: w.CanRestart, } if w.StatusRequested != "" { summary.StatusChange = &api.WorkerStatusChangeRequest{ diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 7e889301..23fe00f7 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -30,6 +30,13 @@ var rememberableWorkerStates = map[api.WorkerStatus]bool{ api.WorkerStatusAwake: true, } +// offlineWorkerStates contains worker statuses that are automatically +// acknowledged on sign-off. +var offlineWorkerStates = map[api.WorkerStatus]bool{ + api.WorkerStatusOffline: true, + api.WorkerStatusRestart: true, +} + // RegisterWorker registers a new worker and stores it in the database. func (f *Flamenco) RegisterWorker(e echo.Context) error { logger := requestLogger(e) @@ -137,6 +144,7 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON w.Address = e.RealIP() w.Name = update.Name w.Software = update.SoftwareVersion + w.CanRestart = update.CanRestart != nil && *update.CanRestart // Remove trailing spaces from task types, and convert to lower case. for idx := range update.SupportedTaskTypes { @@ -168,7 +176,7 @@ func (f *Flamenco) SignOff(e echo.Context) error { w := requestWorkerOrPanic(e) prevStatus := w.Status w.Status = api.WorkerStatusOffline - if w.StatusRequested == api.WorkerStatusOffline { + if offlineWorkerStates[w.StatusRequested] { w.StatusChangeClear() } diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index aa676485..16491ef2 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -25,6 +25,7 @@ type Worker struct { Software string `gorm:"type:varchar(32);default:''"` Status api.WorkerStatus `gorm:"type:varchar(16);default:''"` LastSeenAt time.Time `gorm:"index"` // Should contain UTC timestamps. + CanRestart bool `gorm:"type:smallint;default:false"` StatusRequested api.WorkerStatus `gorm:"type:varchar(16);default:''"` LazyStatusRequest bool `gorm:"type:smallint;default:false"` diff --git a/internal/manager/webupdates/worker_updates.go b/internal/manager/webupdates/worker_updates.go index 45876008..5d6a3d7d 100644 --- a/internal/manager/webupdates/worker_updates.go +++ b/internal/manager/webupdates/worker_updates.go @@ -14,11 +14,12 @@ import ( // the caller. func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate { workerUpdate := api.SocketIOWorkerUpdate{ - Id: worker.UUID, - Name: worker.Name, - Status: worker.Status, - Version: worker.Software, - Updated: worker.UpdatedAt, + Id: worker.UUID, + Name: worker.Name, + Status: worker.Status, + Version: worker.Software, + Updated: worker.UpdatedAt, + CanRestart: worker.CanRestart, } if worker.StatusRequested != "" { diff --git a/internal/worker/config.go b/internal/worker/config.go index d046057e..ca7867cc 100644 --- a/internal/worker/config.go +++ b/internal/worker/config.go @@ -46,7 +46,8 @@ type WorkerConfig struct { // configuration file, but also from autodiscovery via UPnP/SSDP. ManagerURL string `yaml:"-"` - TaskTypes []string `yaml:"task_types"` + TaskTypes []string `yaml:"task_types"` + RestartExitCode int `yaml:"restart_exit_code"` } type WorkerCredentials struct { @@ -145,6 +146,10 @@ func (fcw *FileConfigWrangler) SetManagerURL(managerURL string) { fcw.wc.ManagerURL = managerURL } +func (fcw *FileConfigWrangler) SetRestartExitCode(code int) { + fcw.wc.RestartExitCode = code +} + // DefaultConfig returns a fairly sane default configuration. func (fcw FileConfigWrangler) DefaultConfig() WorkerConfig { return defaultConfig diff --git a/internal/worker/registration.go b/internal/worker/registration.go index 65878b41..e6b15bf9 100644 --- a/internal/worker/registration.go +++ b/internal/worker/registration.go @@ -150,10 +150,12 @@ func repeatSignOnUntilAnswer(ctx context.Context, cfg WorkerConfig, client Flame func signOn(ctx context.Context, cfg WorkerConfig, client FlamencoClient) (api.WorkerStatus, error) { logger := log.With().Str("manager", cfg.ManagerURL).Logger() + canRestart := cfg.RestartExitCode != 0 req := api.SignOnJSONRequestBody{ Name: workerName(), SupportedTaskTypes: cfg.TaskTypes, SoftwareVersion: appinfo.ExtendedVersion(), + CanRestart: &canRestart, } logger.Info(). diff --git a/internal/worker/state_offline.go b/internal/worker/state_offline.go index 537767bb..16d0bb01 100644 --- a/internal/worker/state_offline.go +++ b/internal/worker/state_offline.go @@ -16,6 +16,13 @@ func (w *Worker) gotoStateOffline(context.Context) { defer w.stateMutex.Unlock() w.state = api.WorkerStatusOffline + w.requestShutdown(false) +} + +// requestShutdown closes the w.shutdown channel, to indicate to the main +// function that it should proceed with the shutdown procedure. +func (w *Worker) requestShutdown(requestRestart bool) { + w.restartAfterShutdown = requestRestart // Signal that the Worker should shut down. log.Debug().Msg("closing the shutdown channel") diff --git a/internal/worker/state_restart.go b/internal/worker/state_restart.go new file mode 100644 index 00000000..4630f299 --- /dev/null +++ b/internal/worker/state_restart.go @@ -0,0 +1,17 @@ +package worker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + + "projects.blender.org/studio/flamenco/pkg/api" +) + +func (w *Worker) gotoStateRestart(ctx context.Context) { + w.stateMutex.Lock() + defer w.stateMutex.Unlock() + + w.state = api.WorkerStatusRestart + w.requestShutdown(true) +} diff --git a/internal/worker/statemachine.go b/internal/worker/statemachine.go index 1b12a362..1b72aa2c 100644 --- a/internal/worker/statemachine.go +++ b/internal/worker/statemachine.go @@ -15,6 +15,7 @@ func (w *Worker) setupStateMachine() { w.stateStarters[api.WorkerStatusAsleep] = w.gotoStateAsleep w.stateStarters[api.WorkerStatusAwake] = w.gotoStateAwake w.stateStarters[api.WorkerStatusOffline] = w.gotoStateOffline + w.stateStarters[api.WorkerStatusRestart] = w.gotoStateRestart } // Called whenever the Flamenco Manager has a change in current status for us. diff --git a/internal/worker/worker.go b/internal/worker/worker.go index c7225ff3..c7caa1bb 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -17,7 +17,8 @@ type Worker struct { doneWg *sync.WaitGroup // Will be closed by the Worker when it wants to shut down. See Worker.WaitForShutdown(). - shutdown chan struct{} + shutdown chan struct{} + restartAfterShutdown bool client FlamencoClient @@ -69,14 +70,24 @@ func (w *Worker) Close() { w.doneWg.Wait() } +type ShutdownReason int + +const ( + ReasonContextClosed ShutdownReason = iota // Main Context closed. + ReasonShutdownReq // Manager requested a shutdown. + ReasonRestartReq // Manager requested a restart. +) + // WaitForShutdown waits until Flamenco wants to shut down the application. -// Returns `true` when the Worker has signalled it wants to shut down. -// Returns `false` when the shutdown was caused by the context closing. -func (w *Worker) WaitForShutdown(ctx context.Context) bool { +// Returns the reason of the shutdown. +func (w *Worker) WaitForShutdown(ctx context.Context) ShutdownReason { select { case <-ctx.Done(): - return false + return ReasonContextClosed case <-w.shutdown: - return true + if w.restartAfterShutdown { + return ReasonRestartReq + } + return ReasonShutdownReq } } diff --git a/web/app/src/components/workers/WorkerActionsBar.vue b/web/app/src/components/workers/WorkerActionsBar.vue index 3b4c0880..b4e53e6f 100644 --- a/web/app/src/components/workers/WorkerActionsBar.vue +++ b/web/app/src/components/workers/WorkerActionsBar.vue @@ -4,7 +4,9 @@ - + @@ -25,6 +27,7 @@ const WORKER_ACTIONS = Object.freeze({ title: 'Shut down the worker after the current task finishes. The worker may automatically restart.', target_status: 'offline', lazy: true, + condition: () => true, }, offline_immediate: { label: 'Shut Down (immediately)', @@ -32,6 +35,23 @@ const WORKER_ACTIONS = Object.freeze({ title: 'Immediately shut down the worker. It may automatically restart.', target_status: 'offline', lazy: false, + condition: () => true, + }, + restart_lazy: { + label: 'Restart (after task is finished)', + icon: '✝', + title: 'Restart the worker after the current task finishes.', + target_status: 'restart', + lazy: true, + condition: () => workers.canRestart(), + }, + restart_immediate: { + label: 'Restart (immediately)', + icon: '✝!', + title: 'Immediately restart the worker.', + target_status: 'restart', + lazy: false, + condition: () => workers.canRestart(), }, asleep_lazy: { label: 'Send to Sleep (after task is finished)', @@ -39,6 +59,7 @@ const WORKER_ACTIONS = Object.freeze({ title: 'Let the worker sleep after finishing this task.', target_status: 'asleep', lazy: true, + condition: () => true, }, asleep_immediate: { label: 'Send to Sleep (immediately)', @@ -46,6 +67,7 @@ const WORKER_ACTIONS = Object.freeze({ title: 'Let the worker sleep immediately.', target_status: 'asleep', lazy: false, + condition: () => true, }, wakeup: { label: 'Wake Up', @@ -53,6 +75,7 @@ const WORKER_ACTIONS = Object.freeze({ title: 'Wake the worker up. A sleeping worker can take a minute to respond.', target_status: 'awake', lazy: false, + condition: () => true, }, }); @@ -75,7 +98,9 @@ function performWorkerAction() { console.log("Requesting worker status change", statuschange); api.requestWorkerStatusChange(workerID, statuschange) .then((result) => notifs.add(`Worker status change to ${action.target_status} confirmed.`)) - .catch((error) => notifs.add(`Error requesting worker status change: ${error}`)); + .catch((error) => { + notifs.add(`Error requesting worker status change: ${error.body.message}`) + }); } diff --git a/web/app/src/components/workers/WorkerDetails.vue b/web/app/src/components/workers/WorkerDetails.vue index 58a0195c..9a3e13ef 100644 --- a/web/app/src/components/workers/WorkerDetails.vue +++ b/web/app/src/components/workers/WorkerDetails.vue @@ -32,6 +32,11 @@
+ +
diff --git a/web/app/src/stores/workers.js b/web/app/src/stores/workers.js index cc64bda8..e30f62b6 100644 --- a/web/app/src/stores/workers.js +++ b/web/app/src/stores/workers.js @@ -63,5 +63,12 @@ export const useWorkers = defineStore('workers', { this.tagsByID = tagsByID; }); }, + + /** + * @returns whether the active worker understands how to get restarted. + */ + canRestart() { + return !!this.activeWorker && !!this.activeWorker.can_restart; + }, }, });