diff --git a/Makefile b/Makefile index 1186f207..236871ec 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,9 @@ flamenco-manager: flamenco-worker: go build -v ${BUILD_FLAGS} ${PKG}/cmd/flamenco-worker +stresser: + go build -v ${BUILD_FLAGS} ${PKG}/cmd/stresser + addon-packer: cmd/addon-packer/addon-packer.go go build -v ${BUILD_FLAGS} ${PKG}/cmd/addon-packer diff --git a/cmd/flamenco-worker/main.go b/cmd/flamenco-worker/main.go index 6bd96b45..305a4948 100644 --- a/cmd/flamenco-worker/main.go +++ b/cmd/flamenco-worker/main.go @@ -102,7 +102,7 @@ func main() { // reached and accepts our sign-on request. An offline Manager would cause the // Worker to wait for it indefinitely. startupCtx := context.Background() - client, startupState := worker.RegisterOrSignOn(startupCtx, configWrangler) + client, startupState := worker.RegisterOrSignOn(startupCtx, &configWrangler) shutdownComplete = make(chan struct{}) workerCtx, workerCtxCancel := context.WithCancel(context.Background()) diff --git a/cmd/stresser/main.go b/cmd/stresser/main.go new file mode 100644 index 00000000..214d74ca --- /dev/null +++ b/cmd/stresser/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "flag" + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "github.com/mattn/go-colorable" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/appinfo" + "git.blender.org/flamenco/internal/stresser" +) + +var cliArgs struct { + quiet, debug, trace bool + + workerID string + secret string +} + +func main() { + parseCliArgs() + + output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} + log.Logger = log.Output(output) + + log.Info(). + Str("version", appinfo.ApplicationVersion). + Str("OS", runtime.GOOS). + Str("ARCH", runtime.GOARCH). + Int("pid", os.Getpid()). + Msgf("starting %v Worker", appinfo.ApplicationName) + configLogLevel() + + mainCtx, mainCtxCancel := context.WithCancel(context.Background()) + + // Handle Ctrl+C + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGTERM) + go func() { + for signum := range c { + log.Info().Str("signal", signum.String()).Msg("signal received, shutting down.") + mainCtxCancel() + } + }() + + config := stresser.NewFakeConfig(cliArgs.workerID, cliArgs.secret) + client := stresser.GetFlamencoClient(mainCtx, config) + stresser.Run(mainCtx, client) + + log.Info().Msg("stresser shutting down") +} + +func parseCliArgs() { + flag.BoolVar(&cliArgs.quiet, "quiet", false, "Only log warning-level and worse.") + flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug-level logging.") + flag.BoolVar(&cliArgs.trace, "trace", false, "Enable trace-level logging.") + + flag.StringVar(&cliArgs.workerID, "worker", "", "UUID of the Worker") + flag.StringVar(&cliArgs.secret, "secret", "", "Secret of the Worker") + + flag.Parse() +} + +func configLogLevel() { + var logLevel zerolog.Level + switch { + case cliArgs.trace: + logLevel = zerolog.TraceLevel + case cliArgs.debug: + logLevel = zerolog.DebugLevel + case cliArgs.quiet: + logLevel = zerolog.WarnLevel + default: + logLevel = zerolog.InfoLevel + } + zerolog.SetGlobalLevel(logLevel) +} diff --git a/internal/stresser/fake_config.go b/internal/stresser/fake_config.go new file mode 100644 index 00000000..0a706917 --- /dev/null +++ b/internal/stresser/fake_config.go @@ -0,0 +1,38 @@ +package stresser + +import ( + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/worker" +) + +type FakeConfig struct { + creds worker.WorkerCredentials +} + +func NewFakeConfig(workerID, workerSecret string) *FakeConfig { + return &FakeConfig{ + creds: worker.WorkerCredentials{ + WorkerID: workerID, + Secret: workerSecret, + }, + } +} + +func (fc *FakeConfig) WorkerConfig() (worker.WorkerConfig, error) { + config := worker.NewConfigWrangler().DefaultConfig() + config.ManagerURL = "http://localhost:8080/" + return config, nil +} + +func (fc *FakeConfig) WorkerCredentials() (worker.WorkerCredentials, error) { + return fc.creds, nil +} + +func (fc *FakeConfig) SaveCredentials(creds worker.WorkerCredentials) error { + log.Info(). + Str("workerID", creds.WorkerID). + Str("workerSecret", creds.Secret). + Msg("remember these credentials for next time") + return nil +} diff --git a/internal/stresser/fake_worker.go b/internal/stresser/fake_worker.go new file mode 100644 index 00000000..817f751f --- /dev/null +++ b/internal/stresser/fake_worker.go @@ -0,0 +1,129 @@ +package stresser + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/worker" + "git.blender.org/flamenco/pkg/api" +) + +const ( + // For fetching the task to stress test. + durationNoTask = 1 * time.Second // ... if there is no task now. + durationFetchFailed = 2 * time.Second // ... if fetching failed somehow. +) + +var ( + ErrTaskReassigned = worker.ErrTaskReassigned + ErrTaskUpdateRejected = errors.New("task update was rejected") +) + +func GetFlamencoClient( + ctx context.Context, + config worker.WorkerConfigWithCredentials, +) worker.FlamencoClient { + startupCtx, startupCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer startupCtxCancel() + + client, startupState := worker.RegisterOrSignOn(startupCtx, config) + if startupState != api.WorkerStatusAwake { + log.Fatal().Str("requestedStartupState", string(startupState)).Msg("stresser should always be awake") + } + + ackStateChange(ctx, client, startupState) + + return client +} + +func fetchTask(ctx context.Context, client worker.FlamencoClient) *api.AssignedTask { + // Initially don't wait at all. + var wait time.Duration + + for { + select { + case <-ctx.Done(): + log.Debug().Msg("task fetching interrupted by context cancellation") + return nil + case <-time.After(wait): + } + + log.Debug().Msg("fetching tasks") + resp, err := client.ScheduleTaskWithResponse(ctx) + if err != nil { + log.Error().Err(err).Msg("error obtaining task") + wait = durationFetchFailed + continue + } + switch { + case resp.JSON200 != nil: + log.Info(). + Str("job", resp.JSON200.Job). + Str("task", resp.JSON200.Uuid). + Msg("obtained task") + return resp.JSON200 + case resp.JSON423 != nil: + log.Fatal().Str("requestedStatus", string(resp.JSON423.StatusRequested)). + Msg("Manager requests status change, stresser does not support this") + return nil + case resp.JSON403 != nil: + log.Error(). + Int("code", resp.StatusCode()). + Str("error", string(resp.JSON403.Message)). + Msg("access denied") + wait = durationFetchFailed + case resp.StatusCode() == http.StatusNoContent: + log.Debug().Msg("no task available") + wait = durationNoTask + default: + log.Warn(). + Int("code", resp.StatusCode()). + Str("error", string(resp.Body)). + Msg("unable to obtain task for unknown reason") + wait = durationFetchFailed + } + } +} + +func ackStateChange(ctx context.Context, client worker.FlamencoClient, state api.WorkerStatus) { + req := api.WorkerStateChangedJSONRequestBody{Status: state} + + logger := log.With().Str("state", string(state)).Logger() + logger.Debug().Msg("notifying Manager of our state") + + resp, err := client.WorkerStateChangedWithResponse(ctx, req) + if err != nil { + logger.Fatal().Err(err).Msg("unable to notify Manager of status change") + return + } + + // The 'default' response is for error cases. + if resp.JSONDefault != nil { + logger.Fatal(). + Str("httpCode", resp.HTTPResponse.Status). + Interface("error", resp.JSONDefault). + Msg("error sending status change to Manager") + return + } +} + +func sendTaskUpdate(ctx context.Context, client worker.FlamencoClient, taskID string, update api.TaskUpdate) error { + resp, err := client.TaskUpdateWithResponse(ctx, taskID, api.TaskUpdateJSONRequestBody(update)) + if err != nil { + return err + } + + switch resp.StatusCode() { + case http.StatusNoContent: + return nil + case http.StatusConflict: + return worker.ErrTaskReassigned + default: + return fmt.Errorf("%w: task=%s", ErrTaskUpdateRejected, taskID) + } +} diff --git a/internal/stresser/stresser.go b/internal/stresser/stresser.go new file mode 100644 index 00000000..2ebe1bbb --- /dev/null +++ b/internal/stresser/stresser.go @@ -0,0 +1,127 @@ +package stresser + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "git.blender.org/flamenco/internal/worker" + "git.blender.org/flamenco/pkg/api" + "github.com/rs/zerolog/log" +) + +const ( + // For the actual stress test. + durationWaitStress = 500 * time.Millisecond + reportPeriod = 2 * time.Second +) + +var ( + numRequests = 0 + numFailed = 0 + startTime time.Time + + mutex = sync.RWMutex{} +) + +func Run(ctx context.Context, client worker.FlamencoClient) { + // Get a task. + task := fetchTask(ctx, client) + if task == nil { + log.Fatal().Msg("error obtaining task, shutting down stresser") + return + } + logger := log.With().Str("task", task.Uuid).Logger() + + // Mark the task as active. + err := sendTaskUpdate(ctx, client, task.Uuid, api.TaskUpdate{ + Activity: ptr("Stress testing"), + TaskStatus: ptr(api.TaskStatusActive), + }) + if err != nil { + logger.Warn().Err(err).Msg("Manager rejected task becoming active. Going to stress it anyway.") + } + + startTime = time.Now() + + go reportStatisticsLoop(ctx) + + // Do the stress test. + var wait time.Duration + for { + select { + case <-ctx.Done(): + log.Debug().Msg("stresser interrupted by context cancellation") + return + case <-time.After(wait): + } + + increaseNumRequests() + err := stress(ctx, client, task) + if err != nil { + log.Info().Err(err).Str("task", task.Uuid).Msg("Manager rejected task update") + increaseNumFailed() + } + + wait = durationWaitStress + } +} + +func stress(ctx context.Context, client worker.FlamencoClient, task *api.AssignedTask) error { + logline := "This is a log-line for stress testing. It will be repeated more than once.\n" + bigLog := strings.Repeat(logline, 1000) + + mutex.RLock() + update := api.TaskUpdate{ + Activity: ptr(fmt.Sprintf("stress test update %v", numRequests)), + Log: &bigLog, + } + mutex.RUnlock() + + return sendTaskUpdate(ctx, client, task.Uuid, update) +} + +func ptr[T any](value T) *T { + return &value +} + +func increaseNumRequests() { + mutex.Lock() + defer mutex.Unlock() + numRequests++ +} + +func increaseNumFailed() { + mutex.Lock() + defer mutex.Unlock() + numFailed++ +} + +func reportStatistics() { + mutex.RLock() + defer mutex.RUnlock() + + duration := time.Since(startTime) + durationInSeconds := float64(duration) / float64(time.Second) + reqPerSecond := float64(numRequests) / durationInSeconds + + log.Info(). + Int("numRequests", numRequests). + Int("numFailed", numFailed). + Str("duration", duration.String()). + Float64("requestsPerSecond", reqPerSecond). + Msg("stress progress") +} + +func reportStatisticsLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(reportPeriod): + reportStatistics() + } + } +} diff --git a/internal/worker/registration.go b/internal/worker/registration.go index 779cf75e..b165fffb 100644 --- a/internal/worker/registration.go +++ b/internal/worker/registration.go @@ -27,9 +27,15 @@ var ( errSignOnRejected = errors.New("manager rejected our sign-on credentials") // Reached Manager, but it rejected our creds. ) +type WorkerConfigWithCredentials interface { + WorkerConfig() (WorkerConfig, error) + WorkerCredentials() (WorkerCredentials, error) + SaveCredentials(creds WorkerCredentials) error +} + // registerOrSignOn tries to sign on, and if that fails (or there are no credentials) tries to register. // Returns an authenticated Flamenco OpenAPI client. -func RegisterOrSignOn(ctx context.Context, configWrangler FileConfigWrangler) ( +func RegisterOrSignOn(ctx context.Context, configWrangler WorkerConfigWithCredentials) ( client FlamencoClient, startupState api.WorkerStatus, ) { // Load configuration