Move worker config, sign-on, and registration code into worker package

This makes the `main.go` file simpler, and not depend on any other files
in the `main` package. For some reason, the debugger really likes this.
This commit is contained in:
Sybren A. Stüvel 2022-02-17 11:22:45 +01:00
parent 71edb139dd
commit 66c052d9fd
5 changed files with 93 additions and 148 deletions

View File

@ -1,57 +0,0 @@
package main
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
import (
"flag"
"net/url"
"github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/internal/worker"
)
var cliArgs struct {
version bool
verbose bool
debug bool
managerURL *url.URL
manager string
register bool
}
func parseCliArgs() {
flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.")
flag.BoolVar(&cliArgs.verbose, "verbose", false, "Enable info-level logging.")
flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug-level logging.")
flag.StringVar(&cliArgs.manager, "manager", "", "URL of the Flamenco Manager.")
flag.BoolVar(&cliArgs.register, "register", false, "(Re-)register at the Manager.")
flag.Parse()
if cliArgs.manager != "" {
var err error
cliArgs.managerURL, err = worker.ParseURL(cliArgs.manager)
if err != nil {
log.Fatal().Err(err).Msg("invalid manager URL")
}
}
}

View File

@ -1,81 +0,0 @@
package main
import (
"fmt"
"os"
"github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/internal/worker"
)
/* ***** BEGIN GPL LICENSE BLOCK *****
*
* Original Code Copyright (C) 2022 Blender Foundation.
*
* This file is part of Flamenco.
*
* Flamenco is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
*
* ***** END GPL LICENSE BLOCK ***** */
const (
credentialsFilename = "flamenco-worker-credentials.yaml"
configFilename = "flamenco-worker.yaml"
)
func loadConfig(configWrangler worker.FileConfigWrangler) (worker.WorkerConfig, error) {
logger := log.With().Str("filename", configFilename).Logger()
var cfg worker.WorkerConfig
err := configWrangler.LoadConfig(configFilename, &cfg)
// If the configuration file doesn't exist, write the defaults & retry loading them.
if os.IsNotExist(err) {
logger.Info().Msg("writing default configuration file")
cfg = configWrangler.DefaultConfig()
err = configWrangler.WriteConfig(configFilename, "Configuration", cfg)
if err != nil {
return cfg, fmt.Errorf("error writing default config: %w", err)
}
err = configWrangler.LoadConfig(configFilename, &cfg)
}
if err != nil {
return cfg, fmt.Errorf("error loading config from %s: %w", configFilename, err)
}
// Validate the manager URL.
if cfg.Manager != "" {
_, err := worker.ParseURL(cfg.Manager)
if err != nil {
return cfg, fmt.Errorf("error parsing manager URL %s: %w", cfg.Manager, err)
}
logger.Debug().Str("url", cfg.Manager).Msg("parsed manager URL")
}
return cfg, nil
}
func loadCredentials(configWrangler worker.FileConfigWrangler) (worker.WorkerCredentials, error) {
logger := log.With().Str("filename", configFilename).Logger()
logger.Info().Msg("loading credentials")
var creds worker.WorkerCredentials
err := configWrangler.LoadConfig(credentialsFilename, &creds)
if err != nil {
return worker.WorkerCredentials{}, err
}
return creds, nil
}

View File

@ -22,7 +22,9 @@ package main
import ( import (
"context" "context"
"flag"
"fmt" "fmt"
"net/url"
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
@ -44,6 +46,15 @@ var (
shutdownComplete chan struct{} shutdownComplete chan struct{}
) )
var cliArgs struct {
version bool
verbose bool
debug bool
managerURL *url.URL
manager string
register bool
}
func main() { func main() {
parseCliArgs() parseCliArgs()
if cliArgs.version { if cliArgs.version {
@ -67,7 +78,7 @@ func main() {
// reached and accepts our sign-on request. An offline Manager would cause the // reached and accepts our sign-on request. An offline Manager would cause the
// Worker to wait for it indefinitely. // Worker to wait for it indefinitely.
startupCtx := context.Background() startupCtx := context.Background()
client, startupState := registerOrSignOn(startupCtx, configWrangler) client, startupState := worker.RegisterOrSignOn(startupCtx, configWrangler)
shutdownComplete = make(chan struct{}) shutdownComplete = make(chan struct{})
workerCtx, workerCtxCancel := context.WithCancel(context.Background()) workerCtx, workerCtxCancel := context.WithCancel(context.Background())
@ -124,3 +135,22 @@ func shutdown(signum os.Signal) {
log.Warn().Msg("shutdown complete, stopping process.") log.Warn().Msg("shutdown complete, stopping process.")
close(shutdownComplete) close(shutdownComplete)
} }
func parseCliArgs() {
flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.")
flag.BoolVar(&cliArgs.verbose, "verbose", false, "Enable info-level logging.")
flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug-level logging.")
flag.StringVar(&cliArgs.manager, "manager", "", "URL of the Flamenco Manager.")
flag.BoolVar(&cliArgs.register, "register", false, "(Re-)register at the Manager.")
flag.Parse()
if cliArgs.manager != "" {
var err error
cliArgs.managerURL, err = worker.ParseURL(cliArgs.manager)
if err != nil {
log.Fatal().Err(err).Msg("invalid manager URL")
}
}
}

View File

@ -36,6 +36,11 @@ var (
errURLWithoutHostName = errors.New("manager URL should contain a host name") errURLWithoutHostName = errors.New("manager URL should contain a host name")
) )
const (
credentialsFilename = "flamenco-worker-credentials.yaml"
configFilename = "flamenco-worker.yaml"
)
// WorkerConfig represents the configuration of a single worker. // WorkerConfig represents the configuration of a single worker.
// It does not include authentication credentials. // It does not include authentication credentials.
type WorkerConfig struct { type WorkerConfig struct {
@ -48,6 +53,53 @@ type WorkerCredentials struct {
Secret string `yaml:"worker_secret"` Secret string `yaml:"worker_secret"`
} }
func loadConfig(configWrangler FileConfigWrangler) (WorkerConfig, error) {
logger := log.With().Str("filename", configFilename).Logger()
var cfg WorkerConfig
err := configWrangler.LoadConfig(configFilename, &cfg)
// If the configuration file doesn't exist, write the defaults & retry loading them.
if os.IsNotExist(err) {
logger.Info().Msg("writing default configuration file")
cfg = configWrangler.DefaultConfig()
err = configWrangler.WriteConfig(configFilename, "Configuration", cfg)
if err != nil {
return cfg, fmt.Errorf("error writing default config: %w", err)
}
err = configWrangler.LoadConfig(configFilename, &cfg)
}
if err != nil {
return cfg, fmt.Errorf("error loading config from %s: %w", configFilename, err)
}
// Validate the manager URL.
if cfg.Manager != "" {
_, err := ParseURL(cfg.Manager)
if err != nil {
return cfg, fmt.Errorf("error parsing manager URL %s: %w", cfg.Manager, err)
}
logger.Debug().Str("url", cfg.Manager).Msg("parsed manager URL")
}
return cfg, nil
}
func loadCredentials(configWrangler FileConfigWrangler) (WorkerCredentials, error) {
logger := log.With().Str("filename", configFilename).Logger()
logger.Info().Msg("loading credentials")
var creds WorkerCredentials
err := configWrangler.LoadConfig(credentialsFilename, &creds)
if err != nil {
return WorkerCredentials{}, err
}
return creds, nil
}
// FileConfigWrangler is the default config wrangler that actually reads & writes files. // FileConfigWrangler is the default config wrangler that actually reads & writes files.
type FileConfigWrangler struct{} type FileConfigWrangler struct{}

View File

@ -1,4 +1,4 @@
package main package worker
/* ***** BEGIN GPL LICENSE BLOCK ***** /* ***** BEGIN GPL LICENSE BLOCK *****
* *
@ -33,7 +33,6 @@ import (
"github.com/deepmap/oapi-codegen/pkg/securityprovider" "github.com/deepmap/oapi-codegen/pkg/securityprovider"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gitlab.com/blender/flamenco-ng-poc/internal/appinfo" "gitlab.com/blender/flamenco-ng-poc/internal/appinfo"
"gitlab.com/blender/flamenco-ng-poc/internal/worker"
"gitlab.com/blender/flamenco-ng-poc/pkg/api" "gitlab.com/blender/flamenco-ng-poc/pkg/api"
) )
@ -43,7 +42,9 @@ var (
errSignOnRejected = errors.New("manager rejected our sign-on credentials") // Reached Manager, but it rejected our creds. errSignOnRejected = errors.New("manager rejected our sign-on credentials") // Reached Manager, but it rejected our creds.
) )
func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrangler) ( // registerOrSignOn tries to sign on, and if that fails (or there are no credentials) tries to register.
// Returns an authenticated Flamenco OpenAPI client.
func RegisterOrSignOn(ctx context.Context, configWrangler FileConfigWrangler) (
client api.ClientWithResponsesInterface, startupState api.WorkerStatus, client api.ClientWithResponsesInterface, startupState api.WorkerStatus,
) { ) {
// Load configuration // Load configuration
@ -71,7 +72,7 @@ func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrang
} }
// Either there were no credentials, or existing ones weren't accepted, just register as new worker. // Either there were no credentials, or existing ones weren't accepted, just register as new worker.
client = authenticatedClient(cfg, worker.WorkerCredentials{}) client = authenticatedClient(cfg, WorkerCredentials{})
creds = register(ctx, cfg, client) creds = register(ctx, cfg, client)
// store ID and secretKey in config file when registration is complete. // store ID and secretKey in config file when registration is complete.
@ -93,7 +94,7 @@ func registerOrSignOn(ctx context.Context, configWrangler worker.FileConfigWrang
// (Re-)register ourselves at the Manager. // (Re-)register ourselves at the Manager.
// Logs a fatal error if unsuccesful. // Logs a fatal error if unsuccesful.
func register(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) worker.WorkerCredentials { func register(ctx context.Context, cfg WorkerConfig, client api.ClientWithResponsesInterface) WorkerCredentials {
// Construct our new password. // Construct our new password.
secret := make([]byte, 32) secret := make([]byte, 32)
if _, err := rand.Read(secret); err != nil { if _, err := rand.Read(secret); err != nil {
@ -124,7 +125,7 @@ func register(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWit
Msg("unable to register at Manager") Msg("unable to register at Manager")
} }
return worker.WorkerCredentials{ return WorkerCredentials{
WorkerID: resp.JSON200.Uuid, WorkerID: resp.JSON200.Uuid,
Secret: secretKey, Secret: secretKey,
} }
@ -132,7 +133,7 @@ func register(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWit
// repeatSignOnUntilAnswer tries to sign on, and only returns when it has been able to reach the Manager. // repeatSignOnUntilAnswer tries to sign on, and only returns when it has been able to reach the Manager.
// Return still doesn't mean that the sign-on was succesful; inspect the returned error. // Return still doesn't mean that the sign-on was succesful; inspect the returned error.
func repeatSignOnUntilAnswer(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) { func repeatSignOnUntilAnswer(ctx context.Context, cfg WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) {
waitTime := 0 * time.Second waitTime := 0 * time.Second
for { for {
select { select {
@ -157,7 +158,7 @@ func repeatSignOnUntilAnswer(ctx context.Context, cfg worker.WorkerConfig, clien
} }
// signOn tells the Manager we're alive and returns the status the Manager tells us to go to. // signOn tells the Manager we're alive and returns the status the Manager tells us to go to.
func signOn(ctx context.Context, cfg worker.WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) { func signOn(ctx context.Context, cfg WorkerConfig, client api.ClientWithResponsesInterface) (api.WorkerStatus, error) {
logger := log.With().Str("manager", cfg.Manager).Logger() logger := log.With().Str("manager", cfg.Manager).Logger()
req := api.SignOnJSONRequestBody{ req := api.SignOnJSONRequestBody{
@ -206,7 +207,7 @@ func mustHostname() string {
} }
// authenticatedClient constructs a Flamenco client with the given credentials. // authenticatedClient constructs a Flamenco client with the given credentials.
func authenticatedClient(cfg worker.WorkerConfig, creds worker.WorkerCredentials) api.ClientWithResponsesInterface { func authenticatedClient(cfg WorkerConfig, creds WorkerCredentials) api.ClientWithResponsesInterface {
basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth(creds.WorkerID, creds.Secret) basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth(creds.WorkerID, creds.Secret)
if err != nil { if err != nil {
log.Panic().Err(err).Msg("unable to create basic auth provider") log.Panic().Err(err).Msg("unable to create basic auth provider")