From fabb79e583f44a277b911102626f85d370eb970a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 8 Mar 2022 14:41:30 +0100 Subject: [PATCH] Worker: move autodiscovery code into the 'worker' package Move the UPnP/SSDP Manager autodiscovery code into from `main.go` into the `worker` package. This also means changing the error handling a bit, as only the `main.go` file is allowed to do `log.Fatal()`. --- cmd/flamenco-worker/main.go | 136 +++------------------------- internal/worker/autodiscovery.go | 148 +++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 124 deletions(-) create mode 100644 internal/worker/autodiscovery.go diff --git a/cmd/flamenco-worker/main.go b/cmd/flamenco-worker/main.go index adaf7127..813114d1 100644 --- a/cmd/flamenco-worker/main.go +++ b/cmd/flamenco-worker/main.go @@ -11,7 +11,6 @@ import ( "os" "os/signal" "runtime" - "sync" "syscall" "time" @@ -21,9 +20,7 @@ import ( "github.com/rs/zerolog/log" "git.blender.org/flamenco/internal/appinfo" - "git.blender.org/flamenco/internal/upnp_ssdp" "git.blender.org/flamenco/internal/worker" - "git.blender.org/flamenco/pkg/api" ) var ( @@ -62,7 +59,18 @@ func main() { configLogLevel() configWrangler := worker.NewConfigWrangler() - maybeAutodiscoverManager(&configWrangler) + + // Give the auto-discovery some time to find a Manager. + discoverTimeout := 10 * time.Minute + discoverCtx, discoverCancel := context.WithTimeout(context.Background(), discoverTimeout) + defer discoverCancel() + if err := worker.MaybeAutodiscoverManager(discoverCtx, &configWrangler); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + log.Fatal().Str("timeout", discoverTimeout.String()).Msg("could not discover Manager in time") + } else { + log.Fatal().Err(err).Msg("auto-discovery error") + } + } // Startup can take arbitrarily long, as it only ends when the Manager can be // reached and accepts our sign-on request. An offline Manager would cause the @@ -185,123 +193,3 @@ func upstreamBufferOrDie(client worker.FlamencoClient, timeService clock.Clock) return buffer } - -// maybeAutodiscoverManager starts Manager auto-discovery if there is no Manager URL configured yet. -func maybeAutodiscoverManager(configWrangler *worker.FileConfigWrangler) { - cfg, err := configWrangler.WorkerConfig() - if err != nil { - log.Fatal().Err(err).Msg("unable to load configuration") - } - - if cfg.ManagerURL != "" { - // Manager URL is already known, don't bother with auto-discovery. - return - } - - discoverCtx, discoverCancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer discoverCancel() - - foundManager, err := autodiscoverManager(discoverCtx) - if err != nil { - log.Fatal().Err(err).Msg("unable to discover manager") - } - - configWrangler.SetManagerURL(foundManager) -} - -func autodiscoverManager(ctx context.Context) (string, error) { - c, err := upnp_ssdp.NewClient(log.Logger) - if err != nil { - return "", fmt.Errorf("unable to create UPnP/SSDP client: %w", err) - } - - log.Info().Msg("auto-discovering Manager via UPnP/SSDP") - - urls, err := c.Run(ctx) - if err != nil { - return "", fmt.Errorf("unable to find Manager: %w", err) - } - - if len(urls) == 0 { - return "", errors.New("no Manager could be found") - } - - // Try out the URLs to see which one responds. - startTime := time.Now() - numUsableURLs := 0 - wg := new(sync.WaitGroup) - wg.Add(len(urls)) - mutex := new(sync.Mutex) - for idx, url := range urls { - go func(idx int, url string) { - defer wg.Done() - ok := pingManager(ctx, url) - - mutex.Lock() - defer mutex.Unlock() - - if ok { - numUsableURLs++ - } else { - // Erase the URL from the usable list. - urls[idx] = "" - } - }(idx, url) - } - wg.Wait() - log.Debug().Str("pingTime", time.Since(startTime).String()).Msg("pinging all Manager URLs done") - - if numUsableURLs == 0 { - return "", fmt.Errorf("autodetected %d URLs, but none were usable", len(urls)) - } - - // Find the first usable URL. - var firstURL string - for _, url := range urls { - if url != "" { - firstURL = url - break - } - } - - if numUsableURLs == 1 { - log.Info().Str("url", firstURL).Msg("found Manager") - } else { - log.Info(). - Strs("urls", urls). - Str("url", firstURL). - Msg("found multiple usable URLs, using the first one") - } - - return firstURL, nil -} - -// pingManager connects to a Manager and returns true if it responds. -func pingManager(ctx context.Context, url string) bool { - logger := log.With().Str("manager", url).Logger() - - client, err := api.NewClientWithResponses(url) - if err != nil { - logger.Warn().Err(err).Msg("unable to create API client with this URL") - return false - } - - resp, err := client.GetVersionWithResponse(ctx) - if err != nil { - logger.Warn().Err(err).Msg("unable to get Flamenco version from Manager") - return false - } - - if resp.JSON200 == nil { - logger.Warn(). - Int("httpStatus", resp.StatusCode()). - Msg("unable to get Flamenco version, unexpected reply") - return false - } - - logger.Info(). - Str("version", resp.JSON200.Version). - Str("name", resp.JSON200.Name). - Msg("found Flamenco Manager") - return true -} diff --git a/internal/worker/autodiscovery.go b/internal/worker/autodiscovery.go new file mode 100644 index 00000000..8b0561af --- /dev/null +++ b/internal/worker/autodiscovery.go @@ -0,0 +1,148 @@ +package worker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "git.blender.org/flamenco/internal/upnp_ssdp" + "git.blender.org/flamenco/pkg/api" + "github.com/rs/zerolog/log" +) + +// maybeAutodiscoverManager starts Manager auto-discovery if there is no Manager URL configured yet. +func MaybeAutodiscoverManager(ctx context.Context, configWrangler *FileConfigWrangler) error { + cfg, err := configWrangler.WorkerConfig() + if err != nil { + return fmt.Errorf("loading configuration: %w", err) + } + + if cfg.ManagerURL != "" { + // Manager URL is already known, don't bother with auto-discovery. + return nil + } + + foundManager, err := autodiscoverManager(ctx) + if err != nil { + return err + } + + configWrangler.SetManagerURL(foundManager) + return nil +} + +// autodiscoverManager uses UPnP/SSDP to find a Manager, and returns its URL if found. +func autodiscoverManager(ctx context.Context) (string, error) { + c, err := upnp_ssdp.NewClient(log.Logger) + if err != nil { + return "", fmt.Errorf("unable to create UPnP/SSDP client: %w", err) + } + + logger := log.Logger + if deadline, ok := ctx.Deadline(); ok { + timeout := deadline.Sub(time.Now()).Round(1 * time.Second) + logger = logger.With().Str("timeout", timeout.String()).Logger() + } + logger.Info().Msg("auto-discovering Manager via UPnP/SSDP") + + urls, err := c.Run(ctx) + if err != nil { + return "", fmt.Errorf("unable to find Manager: %w", err) + } + + if len(urls) == 0 { + return "", errors.New("no Manager could be found") + } + + // Try out the URLs to see which one responds. + usableURLs := pingManagers(ctx, urls) + + switch len(usableURLs) { + case 0: + return "", fmt.Errorf("autodetected %d URLs, but none were usable", len(urls)) + case 1: + log.Info().Str("url", usableURLs[0]).Msg("found Manager") + default: + log.Info(). + Strs("urls", urls). + Str("url", usableURLs[0]). + Msg("found multiple usable URLs, using the first one") + } + + return usableURLs[0], nil +} + +// pingManager connects to a Manager and returns true if it responds. +func pingManager(ctx context.Context, url string) bool { + logger := log.With().Str("manager", url).Logger() + + client, err := api.NewClientWithResponses(url) + if err != nil { + logger.Warn().Err(err).Msg("unable to create API client with this URL") + return false + } + + resp, err := client.GetVersionWithResponse(ctx) + if err != nil { + logger.Warn().Err(err).Msg("unable to get Flamenco version from Manager") + return false + } + + if resp.JSON200 == nil { + logger.Warn(). + Int("httpStatus", resp.StatusCode()). + Msg("unable to get Flamenco version, unexpected reply") + return false + } + + logger.Info(). + Str("version", resp.JSON200.Version). + Str("name", resp.JSON200.Name). + Msg("found Flamenco Manager") + return true +} + +// pingManagers pings all URLs in parallel, returning only those that responded. +func pingManagers(ctx context.Context, urls []string) []string { + startTime := time.Now() + + wg := new(sync.WaitGroup) + wg.Add(len(urls)) + mutex := new(sync.Mutex) + + pingURL := func(idx int, url string) { + defer wg.Done() + ok := pingManager(ctx, url) + + mutex.Lock() + defer mutex.Unlock() + + if !ok { + // Erase the URL from the usable list. + // Modifying the original slice instead of appending to a new one ensures + // the original order is maintained. + urls[idx] = "" + } + } + + for idx, url := range urls { + go pingURL(idx, url) + } + + wg.Wait() + log.Debug().Str("pingTime", time.Since(startTime).String()).Msg("pinging all Manager URLs done") + + // Find the usable URLs. + usableURLs := make([]string, 0) + for _, url := range urls { + if url != "" { + usableURLs = append(usableURLs, url) + } + } + + return usableURLs +}