Worker: Change how the worker shuts down

Instead of sending the current process an interrupt signal, use a dedicated
channel to signal the wish to shut down. The main function responds to that
channel closing by performing the shutdown.

This solves an issue where the Worker would not cleanly shut down on
Windows when `offline` state was requested by the Manager.
This commit is contained in:
Sybren A. Stüvel 2022-08-12 11:15:19 -07:00
parent 6de5c9e7fa
commit 1355ec5e1d
4 changed files with 33 additions and 25 deletions

View File

@ -11,6 +11,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
"sync"
"syscall" "syscall"
"time" "time"
@ -153,11 +154,23 @@ func main() {
} }
}() }()
go listener.Run(workerCtx) wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
listener.Run(workerCtx)
}()
go w.Start(workerCtx, startupState) go w.Start(workerCtx, startupState)
if w.WaitForShutdown(workerCtx) {
go shutdown()
}
<-shutdownComplete <-shutdownComplete
workerCtxCancel()
wg.Wait()
log.Debug().Msg("process shutting down") log.Debug().Msg("process shutting down")
} }
@ -166,7 +179,6 @@ func shutdown() {
go func() { go func() {
if w != nil { if w != nil {
w.Close() w.Close()
listener.Wait()
if err := buffer.Close(); err != nil { if err := buffer.Close(); err != nil {
log.Error().Err(err).Msg("closing upstream task buffer") log.Error().Err(err).Msg("closing upstream task buffer")
} }

View File

@ -7,7 +7,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -23,7 +22,6 @@ var (
// Listener listens to the result of task and command execution, and sends it to the Manager. // Listener listens to the result of task and command execution, and sends it to the Manager.
type Listener struct { type Listener struct {
doneWg *sync.WaitGroup
client FlamencoClient client FlamencoClient
buffer UpstreamBuffer buffer UpstreamBuffer
outputUploader *OutputUploader outputUploader *OutputUploader
@ -37,28 +35,19 @@ type UpstreamBuffer interface {
// NewListener creates a new Listener that will send updates to the API client. // NewListener creates a new Listener that will send updates to the API client.
func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener { func NewListener(client FlamencoClient, buffer UpstreamBuffer) *Listener {
l := &Listener{ l := &Listener{
doneWg: new(sync.WaitGroup),
client: client, client: client,
buffer: buffer, buffer: buffer,
outputUploader: NewOutputUploader(client), outputUploader: NewOutputUploader(client),
} }
l.doneWg.Add(1)
return l return l
} }
func (l *Listener) Run(ctx context.Context) { func (l *Listener) Run(ctx context.Context) {
defer l.doneWg.Done()
defer log.Debug().Msg("listener shutting down") defer log.Debug().Msg("listener shutting down")
log.Debug().Msg("listener starting up") log.Debug().Msg("listener starting up")
l.outputUploader.Run(ctx) l.outputUploader.Run(ctx)
} }
func (l *Listener) Wait() {
log.Debug().Msg("waiting for listener to shut down")
l.doneWg.Wait()
}
func ptr[T any](value T) *T { func ptr[T any](value T) *T {
return &value return &value
} }

View File

@ -4,7 +4,6 @@ package worker
import ( import (
"context" "context"
"os"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -18,17 +17,9 @@ func (w *Worker) gotoStateOffline(context.Context) {
w.state = api.WorkerStatusOffline w.state = api.WorkerStatusOffline
logger := log.With().Int("pid", os.Getpid()).Logger() // Signal that the Worker should shut down.
proc, err := os.FindProcess(os.Getpid()) log.Debug().Msg("closing the shutdown channel")
if err != nil { close(w.shutdown)
logger.Fatal().Err(err).Msg("unable to find our own process for clean shutdown")
}
logger.Warn().Msg("sending our own process an interrupt signal")
err = proc.Signal(os.Interrupt)
if err != nil {
logger.Fatal().Err(err).Msg("unable to send interrupt signal to our own process")
}
} }
// SignOff forces the worker in shutdown state and acknlowedges this to the Manager. // SignOff forces the worker in shutdown state and acknlowedges this to the Manager.

View File

@ -16,6 +16,9 @@ type Worker struct {
doneChan chan struct{} doneChan chan struct{}
doneWg *sync.WaitGroup doneWg *sync.WaitGroup
// Will be closed by the Worker when it wants to shut down. See Worker.WaitForShutdown().
shutdown chan struct{}
client FlamencoClient client FlamencoClient
state api.WorkerStatus state api.WorkerStatus
@ -40,6 +43,7 @@ func NewWorker(
worker := &Worker{ worker := &Worker{
doneChan: make(chan struct{}), doneChan: make(chan struct{}),
doneWg: new(sync.WaitGroup), doneWg: new(sync.WaitGroup),
shutdown: make(chan struct{}),
client: flamenco, client: flamenco,
@ -64,3 +68,15 @@ func (w *Worker) Close() {
close(w.doneChan) close(w.doneChan)
w.doneWg.Wait() w.doneWg.Wait()
} }
// WaitForShutdown waits until Flamenco wants to shut down the application.
// Returns `true` when the Worker has signalled it wants to shut down.
// Returns `false` when the shutdown was caused by the context closing.
func (w *Worker) WaitForShutdown(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case <-w.shutdown:
return true
}
}