From dfd55914b2d24f459aed93081a0ccd4cf3339d46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 15 Feb 2022 11:44:16 +0100 Subject: [PATCH] Worker: keep trying to sign on until we have an answer from the Manager --- cmd/flamenco-worker-poc/main.go | 6 ++-- cmd/flamenco-worker-poc/registration.go | 47 ++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/cmd/flamenco-worker-poc/main.go b/cmd/flamenco-worker-poc/main.go index 024560f1..2355afa3 100644 --- a/cmd/flamenco-worker-poc/main.go +++ b/cmd/flamenco-worker-poc/main.go @@ -63,9 +63,11 @@ func main() { configWrangler := worker.NewConfigWrangler() - startupCtx, sctxCancelFunc := context.WithTimeout(context.Background(), 10*time.Second) + // Startup can take arbitrarily long, as it only ends when the Manager can be + // reached and accepts our sign-on request. An offline Manager would cause the + // Worker to wait for it indefinitely. + startupCtx := context.Background() client, startupState := registerOrSignOn(startupCtx, configWrangler) - sctxCancelFunc() shutdownComplete = make(chan struct{}) workerCtx, workerCtxCancel := context.WithCancel(context.Background()) diff --git a/cmd/flamenco-worker-poc/registration.go b/cmd/flamenco-worker-poc/registration.go index 686be44a..3ff1d640 100644 --- a/cmd/flamenco-worker-poc/registration.go +++ b/cmd/flamenco-worker-poc/registration.go @@ -28,6 +28,7 @@ import ( "net/http" "os" "runtime" + "time" "github.com/deepmap/oapi-codegen/pkg/securityprovider" "github.com/rs/zerolog/log" @@ -36,7 +37,11 @@ import ( "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) -var errSignOnFailure = errors.New("unable to sign on at Manager") +var ( + errSignOnCanceled = errors.New("sign-on cancelled") // For example by closing the context. + errSignOnRepeatableFailure = errors.New("unable to sign on at Manager, try again later") // For example failed connections + errSignOnRejected = errors.New("manager rejected our sign-on credentials") // Reached Manager, but it rejected our creds. +) func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrangler) ( client api.ClientWithResponsesInterface, startupState api.WorkerStatus, @@ -58,7 +63,7 @@ func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrang if err == nil { // Credentials can be loaded just fine, try to sign on with them. client = authenticatedClient(cfg, creds) - startupState, err = signOn(ctx, cfg, client) + startupState, err = repeatSignOnUntilAnswer(ctx, cfg, client) if err == nil { // Sign on is fine! return @@ -125,20 +130,52 @@ func register(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWit } } +// repeatSignOnUntilAnswer tries to sign on, and only returns when it has been able to reach the Manager. +// Return still doesn't mean that the sign-on was succesful; inspect the returned error. +func repeatSignOnUntilAnswer(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) { + waitTime := 0 * time.Second + for { + select { + case <-ctx.Done(): + return api.WorkerStatus(""), errSignOnCanceled + case <-time.After(waitTime): + } + + status, err := signOn(ctx, cfg, client) + if err == nil { + // Sign-on was succesful, we're done! + return status, nil + } + if err != errSignOnRepeatableFailure { + // We shouldn't repeat the sign-on; communication was succesful but somehow our credentials were rejected. + return status, err + } + + // Try again after a while. + waitTime = 5 * time.Second + } +} + // signOn tells the Manager we're alive and returns the status the Manager tells us to go to. func signOn(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) { logger := log.With().Str("manager", cfg.Manager).Logger() - logger.Info().Interface("taskTypes", cfg.TaskTypes).Msg("signing on at Manager") req := api.SignOnJSONRequestBody{ Nickname: mustHostname(), SupportedTaskTypes: cfg.TaskTypes, SoftwareVersion: appinfo.ApplicationVersion, } + + logger.Info(). + Str("nickname", req.Nickname). + Str("softwareVersion", req.SoftwareVersion). + Interface("taskTypes", req.SupportedTaskTypes). + Msg("signing on at Manager") + resp, err := client.SignOnWithResponse(ctx, req) if err != nil { logger.Warn().Err(err).Msg("unable to send sign-on request") - return "", errSignOnFailure + return "", errSignOnRepeatableFailure } switch { case resp.JSON200 != nil: @@ -151,7 +188,7 @@ func signOn(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithR Int("code", resp.StatusCode()). Interface("resp", resp.JSONDefault). Msg("unable to sign on at Manager") - return "", errSignOnFailure + return "", errSignOnRejected } startupState := resp.JSON200.StatusRequested