From 1355ec5e1dab66b8ee4adcd597920da057d1daed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 12 Aug 2022 11:15:19 -0700 Subject: [PATCH] Worker: Change how the worker shuts down Instead of sending the current process an interrupt signal, use a dedicated channel to signal the wish to shut down. The main function responds to that channel closing by performing the shutdown. This solves an issue where the Worker would not cleanly shut down on Windows when `offline` state was requested by the Manager. --- cmd/flamenco-worker/main.go | 16 ++++++++++++++-- internal/worker/listener.go | 11 ----------- internal/worker/state_offline.go | 15 +++------------ internal/worker/worker.go | 16 ++++++++++++++++ 4 files changed, 33 insertions(+), 25 deletions(-) diff --git a/cmd/flamenco-worker/main.go b/cmd/flamenco-worker/main.go index 243a493a..456dde25 100644 --- a/cmd/flamenco-worker/main.go +++ b/cmd/flamenco-worker/main.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "runtime" + "sync" "syscall" "time" @@ -153,11 +154,23 @@ func main() { } }() - go listener.Run(workerCtx) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + listener.Run(workerCtx) + }() + go w.Start(workerCtx, startupState) + if w.WaitForShutdown(workerCtx) { + go shutdown() + } <-shutdownComplete + workerCtxCancel() + wg.Wait() + log.Debug().Msg("process shutting down") } @@ -166,7 +179,6 @@ func shutdown() { go func() { if w != nil { w.Close() - listener.Wait() if err := buffer.Close(); err != nil { log.Error().Err(err).Msg("closing upstream task buffer") } diff --git a/internal/worker/listener.go b/internal/worker/listener.go index 3cee8b4b..f4b57538 100644 --- a/internal/worker/listener.go +++ b/internal/worker/listener.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "strings" - "sync" "github.com/rs/zerolog/log" @@ -23,7 +22,6 @@ var ( // Listener listens to the result of task and command execution, and sends it to the Manager. type Listener struct { - doneWg *sync.WaitGroup client FlamencoClient buffer UpstreamBuffer outputUploader *OutputUploader @@ -37,28 +35,19 @@ type UpstreamBuffer interface { // NewListener creates a new Listener that will send updates to the API client. func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener { l := &Listener{ - doneWg: new(sync.WaitGroup), client: client, buffer: buffer, outputUploader: NewOutputUploader(client), } - l.doneWg.Add(1) return l } func (l *Listener) Run(ctx context.Context) { - defer l.doneWg.Done() defer log.Debug().Msg("listener shutting down") - log.Debug().Msg("listener starting up") l.outputUploader.Run(ctx) } -func (l *Listener) Wait() { - log.Debug().Msg("waiting for listener to shut down") - l.doneWg.Wait() -} - func ptr[T any](value T) *T { return &value } diff --git a/internal/worker/state_offline.go b/internal/worker/state_offline.go index 319098a7..b00b3129 100644 --- a/internal/worker/state_offline.go +++ b/internal/worker/state_offline.go @@ -4,7 +4,6 @@ package worker import ( "context" - "os" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -18,17 +17,9 @@ func (w *Worker) gotoStateOffline(context.Context) { w.state = api.WorkerStatusOffline - logger := log.With().Int("pid", os.Getpid()).Logger() - proc, err := os.FindProcess(os.Getpid()) - if err != nil { - logger.Fatal().Err(err).Msg("unable to find our own process for clean shutdown") - } - - logger.Warn().Msg("sending our own process an interrupt signal") - err = proc.Signal(os.Interrupt) - if err != nil { - logger.Fatal().Err(err).Msg("unable to send interrupt signal to our own process") - } + // Signal that the Worker should shut down. + log.Debug().Msg("closing the shutdown channel") + close(w.shutdown) } // SignOff forces the worker in shutdown state and acknlowedges this to the Manager. diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 9d45e8db..80208984 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -16,6 +16,9 @@ type Worker struct { doneChan chan struct{} doneWg *sync.WaitGroup + // Will be closed by the Worker when it wants to shut down. See Worker.WaitForShutdown(). + shutdown chan struct{} + client FlamencoClient state api.WorkerStatus @@ -40,6 +43,7 @@ func NewWorker( worker := &Worker{ doneChan: make(chan struct{}), doneWg: new(sync.WaitGroup), + shutdown: make(chan struct{}), client: flamenco, @@ -64,3 +68,15 @@ func (w *Worker) Close() { close(w.doneChan) w.doneWg.Wait() } + +// WaitForShutdown waits until Flamenco wants to shut down the application. +// Returns `true` when the Worker has signalled it wants to shut down. +// Returns `false` when the shutdown was caused by the context closing. +func (w *Worker) WaitForShutdown(ctx context.Context) bool { + select { + case <-ctx.Done(): + return false + case <-w.shutdown: + return true + } +}