diff --git a/cmd/flamenco-worker/main.go b/cmd/flamenco-worker/main.go index 243a493a..456dde25 100644 --- a/cmd/flamenco-worker/main.go +++ b/cmd/flamenco-worker/main.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "runtime" + "sync" "syscall" "time" @@ -153,11 +154,23 @@ func main() { } }() - go listener.Run(workerCtx) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + listener.Run(workerCtx) + }() + go w.Start(workerCtx, startupState) + if w.WaitForShutdown(workerCtx) { + go shutdown() + } <-shutdownComplete + workerCtxCancel() + wg.Wait() + log.Debug().Msg("process shutting down") } @@ -166,7 +179,6 @@ func shutdown() { go func() { if w != nil { w.Close() - listener.Wait() if err := buffer.Close(); err != nil { log.Error().Err(err).Msg("closing upstream task buffer") } diff --git a/internal/worker/listener.go b/internal/worker/listener.go index 3cee8b4b..f4b57538 100644 --- a/internal/worker/listener.go +++ b/internal/worker/listener.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "strings" - "sync" "github.com/rs/zerolog/log" @@ -23,7 +22,6 @@ var ( // Listener listens to the result of task and command execution, and sends it to the Manager. type Listener struct { - doneWg *sync.WaitGroup client FlamencoClient buffer UpstreamBuffer outputUploader *OutputUploader @@ -37,28 +35,19 @@ type UpstreamBuffer interface { // NewListener creates a new Listener that will send updates to the API client. func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener { l := &Listener{ - doneWg: new(sync.WaitGroup), client: client, buffer: buffer, outputUploader: NewOutputUploader(client), } - l.doneWg.Add(1) return l } func (l *Listener) Run(ctx context.Context) { - defer l.doneWg.Done() defer log.Debug().Msg("listener shutting down") - log.Debug().Msg("listener starting up") l.outputUploader.Run(ctx) } -func (l *Listener) Wait() { - log.Debug().Msg("waiting for listener to shut down") - l.doneWg.Wait() -} - func ptr[T any](value T) *T { return &value } diff --git a/internal/worker/state_offline.go b/internal/worker/state_offline.go index 319098a7..b00b3129 100644 --- a/internal/worker/state_offline.go +++ b/internal/worker/state_offline.go @@ -4,7 +4,6 @@ package worker import ( "context" - "os" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -18,17 +17,9 @@ func (w *Worker) gotoStateOffline(context.Context) { w.state = api.WorkerStatusOffline - logger := log.With().Int("pid", os.Getpid()).Logger() - proc, err := os.FindProcess(os.Getpid()) - if err != nil { - logger.Fatal().Err(err).Msg("unable to find our own process for clean shutdown") - } - - logger.Warn().Msg("sending our own process an interrupt signal") - err = proc.Signal(os.Interrupt) - if err != nil { - logger.Fatal().Err(err).Msg("unable to send interrupt signal to our own process") - } + // Signal that the Worker should shut down. + log.Debug().Msg("closing the shutdown channel") + close(w.shutdown) } // SignOff forces the worker in shutdown state and acknlowedges this to the Manager. diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 9d45e8db..80208984 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -16,6 +16,9 @@ type Worker struct { doneChan chan struct{} doneWg *sync.WaitGroup + // Will be closed by the Worker when it wants to shut down. See Worker.WaitForShutdown(). + shutdown chan struct{} + client FlamencoClient state api.WorkerStatus @@ -40,6 +43,7 @@ func NewWorker( worker := &Worker{ doneChan: make(chan struct{}), doneWg: new(sync.WaitGroup), + shutdown: make(chan struct{}), client: flamenco, @@ -64,3 +68,15 @@ func (w *Worker) Close() { close(w.doneChan) w.doneWg.Wait() } + +// WaitForShutdown waits until Flamenco wants to shut down the application. +// Returns `true` when the Worker has signalled it wants to shut down. +// Returns `false` when the shutdown was caused by the context closing. +func (w *Worker) WaitForShutdown(ctx context.Context) bool { + select { + case <-ctx.Done(): + return false + case <-w.shutdown: + return true + } +}