Worker: keep trying to sign on until we have an answer from the Manager

This commit is contained in:
Sybren A. Stüvel 2022-02-15 11:44:16 +01:00
parent beda64d7c0
commit dfd55914b2
2 changed files with 46 additions and 7 deletions

View File

@ -63,9 +63,11 @@ func main() {
configWrangler := worker.NewConfigWrangler() configWrangler := worker.NewConfigWrangler()
startupCtx, sctxCancelFunc := context.WithTimeout(context.Background(), 10*time.Second) // Startup can take arbitrarily long, as it only ends when the Manager can be
// reached and accepts our sign-on request. An offline Manager would cause the
// Worker to wait for it indefinitely.
startupCtx := context.Background()
client, startupState := registerOrSignOn(startupCtx, configWrangler) client, startupState := registerOrSignOn(startupCtx, configWrangler)
sctxCancelFunc()
shutdownComplete = make(chan struct{}) shutdownComplete = make(chan struct{})
workerCtx, workerCtxCancel := context.WithCancel(context.Background()) workerCtx, workerCtxCancel := context.WithCancel(context.Background())

View File

@ -28,6 +28,7 @@ import (
"net/http" "net/http"
"os" "os"
"runtime" "runtime"
"time"
"github.com/deepmap/oapi-codegen/pkg/securityprovider" "github.com/deepmap/oapi-codegen/pkg/securityprovider"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -36,7 +37,11 @@ import (
"gitlab.com/blender/flamenco-ng-poc/pkg/api" "gitlab.com/blender/flamenco-ng-poc/pkg/api"
) )
var errSignOnFailure = errors.New("unable to sign on at Manager") var (
errSignOnCanceled = errors.New("sign-on cancelled") // For example by closing the context.
errSignOnRepeatableFailure = errors.New("unable to sign on at Manager, try again later") // For example failed connections
errSignOnRejected = errors.New("manager rejected our sign-on credentials") // Reached Manager, but it rejected our creds.
)
func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrangler) ( func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrangler) (
client api.ClientWithResponsesInterface, startupState api.WorkerStatus, client api.ClientWithResponsesInterface, startupState api.WorkerStatus,
@ -58,7 +63,7 @@ func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrang
if err == nil { if err == nil {
// Credentials can be loaded just fine, try to sign on with them. // Credentials can be loaded just fine, try to sign on with them.
client = authenticatedClient(cfg, creds) client = authenticatedClient(cfg, creds)
startupState, err = signOn(ctx, cfg, client) startupState, err = repeatSignOnUntilAnswer(ctx, cfg, client)
if err == nil { if err == nil {
// Sign on is fine! // Sign on is fine!
return return
@ -125,20 +130,52 @@ func register(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWit
} }
} }
// repeatSignOnUntilAnswer tries to sign on, and only returns when it has been able to reach the Manager.
// Return still doesn't mean that the sign-on was succesful; inspect the returned error.
func repeatSignOnUntilAnswer(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) {
waitTime := 0 * time.Second
for {
select {
case <-ctx.Done():
return api.WorkerStatus(""), errSignOnCanceled
case <-time.After(waitTime):
}
status, err := signOn(ctx, cfg, client)
if err == nil {
// Sign-on was succesful, we're done!
return status, nil
}
if err != errSignOnRepeatableFailure {
// We shouldn't repeat the sign-on; communication was succesful but somehow our credentials were rejected.
return status, err
}
// Try again after a while.
waitTime = 5 * time.Second
}
}
// signOn tells the Manager we're alive and returns the status the Manager tells us to go to. // signOn tells the Manager we're alive and returns the status the Manager tells us to go to.
func signOn(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) { func signOn(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) {
logger := log.With().Str("manager", cfg.Manager).Logger() logger := log.With().Str("manager", cfg.Manager).Logger()
logger.Info().Interface("taskTypes", cfg.TaskTypes).Msg("signing on at Manager")
req := api.SignOnJSONRequestBody{ req := api.SignOnJSONRequestBody{
Nickname: mustHostname(), Nickname: mustHostname(),
SupportedTaskTypes: cfg.TaskTypes, SupportedTaskTypes: cfg.TaskTypes,
SoftwareVersion: appinfo.ApplicationVersion, SoftwareVersion: appinfo.ApplicationVersion,
} }
logger.Info().
Str("nickname", req.Nickname).
Str("softwareVersion", req.SoftwareVersion).
Interface("taskTypes", req.SupportedTaskTypes).
Msg("signing on at Manager")
resp, err := client.SignOnWithResponse(ctx, req) resp, err := client.SignOnWithResponse(ctx, req)
if err != nil { if err != nil {
logger.Warn().Err(err).Msg("unable to send sign-on request") logger.Warn().Err(err).Msg("unable to send sign-on request")
return "", errSignOnFailure return "", errSignOnRepeatableFailure
} }
switch { switch {
case resp.JSON200 != nil: case resp.JSON200 != nil:
@ -151,7 +188,7 @@ func signOn(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithR
Int("code", resp.StatusCode()). Int("code", resp.StatusCode()).
Interface("resp", resp.JSONDefault). Interface("resp", resp.JSONDefault).
Msg("unable to sign on at Manager") Msg("unable to sign on at Manager")
return "", errSignOnFailure return "", errSignOnRejected
} }
startupState := resp.JSON200.StatusRequested startupState := resp.JSON200.StatusRequested