Add stress tester for task updates

Build with `make stresser`. Run with:

  ./stresser -worker UUID -secret ABCXYZ

The worker ID and secret can be obtained from
`flamenco-worker-credentials.yaml`. If left empty, the stresser will
register as a new worker, and log the credentials to be used on the next
invocation.
This commit is contained in:
Sybren A. Stüvel 2022-07-15 11:45:21 +02:00
parent 6e28271c93
commit 35fe0146d3
7 changed files with 390 additions and 2 deletions

View File

@ -42,6 +42,9 @@ flamenco-manager:
flamenco-worker: flamenco-worker:
go build -v ${BUILD_FLAGS} ${PKG}/cmd/flamenco-worker go build -v ${BUILD_FLAGS} ${PKG}/cmd/flamenco-worker
stresser:
go build -v ${BUILD_FLAGS} ${PKG}/cmd/stresser
addon-packer: cmd/addon-packer/addon-packer.go addon-packer: cmd/addon-packer/addon-packer.go
go build -v ${BUILD_FLAGS} ${PKG}/cmd/addon-packer go build -v ${BUILD_FLAGS} ${PKG}/cmd/addon-packer

View File

@ -102,7 +102,7 @@ func main() {
// reached and accepts our sign-on request. An offline Manager would cause the // reached and accepts our sign-on request. An offline Manager would cause the
// Worker to wait for it indefinitely. // Worker to wait for it indefinitely.
startupCtx := context.Background() startupCtx := context.Background()
client, startupState := worker.RegisterOrSignOn(startupCtx, configWrangler) client, startupState := worker.RegisterOrSignOn(startupCtx, &configWrangler)
shutdownComplete = make(chan struct{}) shutdownComplete = make(chan struct{})
workerCtx, workerCtxCancel := context.WithCancel(context.Background()) workerCtx, workerCtxCancel := context.WithCancel(context.Background())

85
cmd/stresser/main.go Normal file
View File

@ -0,0 +1,85 @@
package main
import (
"context"
"flag"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/appinfo"
"git.blender.org/flamenco/internal/stresser"
)
var cliArgs struct {
quiet, debug, trace bool
workerID string
secret string
}
func main() {
parseCliArgs()
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
log.Info().
Str("version", appinfo.ApplicationVersion).
Str("OS", runtime.GOOS).
Str("ARCH", runtime.GOARCH).
Int("pid", os.Getpid()).
Msgf("starting %v Worker", appinfo.ApplicationName)
configLogLevel()
mainCtx, mainCtxCancel := context.WithCancel(context.Background())
// Handle Ctrl+C
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
for signum := range c {
log.Info().Str("signal", signum.String()).Msg("signal received, shutting down.")
mainCtxCancel()
}
}()
config := stresser.NewFakeConfig(cliArgs.workerID, cliArgs.secret)
client := stresser.GetFlamencoClient(mainCtx, config)
stresser.Run(mainCtx, client)
log.Info().Msg("stresser shutting down")
}
func parseCliArgs() {
flag.BoolVar(&cliArgs.quiet, "quiet", false, "Only log warning-level and worse.")
flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug-level logging.")
flag.BoolVar(&cliArgs.trace, "trace", false, "Enable trace-level logging.")
flag.StringVar(&cliArgs.workerID, "worker", "", "UUID of the Worker")
flag.StringVar(&cliArgs.secret, "secret", "", "Secret of the Worker")
flag.Parse()
}
func configLogLevel() {
var logLevel zerolog.Level
switch {
case cliArgs.trace:
logLevel = zerolog.TraceLevel
case cliArgs.debug:
logLevel = zerolog.DebugLevel
case cliArgs.quiet:
logLevel = zerolog.WarnLevel
default:
logLevel = zerolog.InfoLevel
}
zerolog.SetGlobalLevel(logLevel)
}

View File

@ -0,0 +1,38 @@
package stresser
import (
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/worker"
)
type FakeConfig struct {
creds worker.WorkerCredentials
}
func NewFakeConfig(workerID, workerSecret string) *FakeConfig {
return &FakeConfig{
creds: worker.WorkerCredentials{
WorkerID: workerID,
Secret: workerSecret,
},
}
}
func (fc *FakeConfig) WorkerConfig() (worker.WorkerConfig, error) {
config := worker.NewConfigWrangler().DefaultConfig()
config.ManagerURL = "http://localhost:8080/"
return config, nil
}
func (fc *FakeConfig) WorkerCredentials() (worker.WorkerCredentials, error) {
return fc.creds, nil
}
func (fc *FakeConfig) SaveCredentials(creds worker.WorkerCredentials) error {
log.Info().
Str("workerID", creds.WorkerID).
Str("workerSecret", creds.Secret).
Msg("remember these credentials for next time")
return nil
}

View File

@ -0,0 +1,129 @@
package stresser
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/rs/zerolog/log"
"git.blender.org/flamenco/internal/worker"
"git.blender.org/flamenco/pkg/api"
)
const (
// For fetching the task to stress test.
durationNoTask = 1 * time.Second // ... if there is no task now.
durationFetchFailed = 2 * time.Second // ... if fetching failed somehow.
)
var (
ErrTaskReassigned = worker.ErrTaskReassigned
ErrTaskUpdateRejected = errors.New("task update was rejected")
)
func GetFlamencoClient(
ctx context.Context,
config worker.WorkerConfigWithCredentials,
) worker.FlamencoClient {
startupCtx, startupCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer startupCtxCancel()
client, startupState := worker.RegisterOrSignOn(startupCtx, config)
if startupState != api.WorkerStatusAwake {
log.Fatal().Str("requestedStartupState", string(startupState)).Msg("stresser should always be awake")
}
ackStateChange(ctx, client, startupState)
return client
}
func fetchTask(ctx context.Context, client worker.FlamencoClient) *api.AssignedTask {
// Initially don't wait at all.
var wait time.Duration
for {
select {
case <-ctx.Done():
log.Debug().Msg("task fetching interrupted by context cancellation")
return nil
case <-time.After(wait):
}
log.Debug().Msg("fetching tasks")
resp, err := client.ScheduleTaskWithResponse(ctx)
if err != nil {
log.Error().Err(err).Msg("error obtaining task")
wait = durationFetchFailed
continue
}
switch {
case resp.JSON200 != nil:
log.Info().
Str("job", resp.JSON200.Job).
Str("task", resp.JSON200.Uuid).
Msg("obtained task")
return resp.JSON200
case resp.JSON423 != nil:
log.Fatal().Str("requestedStatus", string(resp.JSON423.StatusRequested)).
Msg("Manager requests status change, stresser does not support this")
return nil
case resp.JSON403 != nil:
log.Error().
Int("code", resp.StatusCode()).
Str("error", string(resp.JSON403.Message)).
Msg("access denied")
wait = durationFetchFailed
case resp.StatusCode() == http.StatusNoContent:
log.Debug().Msg("no task available")
wait = durationNoTask
default:
log.Warn().
Int("code", resp.StatusCode()).
Str("error", string(resp.Body)).
Msg("unable to obtain task for unknown reason")
wait = durationFetchFailed
}
}
}
func ackStateChange(ctx context.Context, client worker.FlamencoClient, state api.WorkerStatus) {
req := api.WorkerStateChangedJSONRequestBody{Status: state}
logger := log.With().Str("state", string(state)).Logger()
logger.Debug().Msg("notifying Manager of our state")
resp, err := client.WorkerStateChangedWithResponse(ctx, req)
if err != nil {
logger.Fatal().Err(err).Msg("unable to notify Manager of status change")
return
}
// The 'default' response is for error cases.
if resp.JSONDefault != nil {
logger.Fatal().
Str("httpCode", resp.HTTPResponse.Status).
Interface("error", resp.JSONDefault).
Msg("error sending status change to Manager")
return
}
}
func sendTaskUpdate(ctx context.Context, client worker.FlamencoClient, taskID string, update api.TaskUpdate) error {
resp, err := client.TaskUpdateWithResponse(ctx, taskID, api.TaskUpdateJSONRequestBody(update))
if err != nil {
return err
}
switch resp.StatusCode() {
case http.StatusNoContent:
return nil
case http.StatusConflict:
return worker.ErrTaskReassigned
default:
return fmt.Errorf("%w: task=%s", ErrTaskUpdateRejected, taskID)
}
}

View File

@ -0,0 +1,127 @@
package stresser
import (
"context"
"fmt"
"strings"
"sync"
"time"
"git.blender.org/flamenco/internal/worker"
"git.blender.org/flamenco/pkg/api"
"github.com/rs/zerolog/log"
)
const (
// For the actual stress test.
durationWaitStress = 500 * time.Millisecond
reportPeriod = 2 * time.Second
)
var (
numRequests = 0
numFailed = 0
startTime time.Time
mutex = sync.RWMutex{}
)
func Run(ctx context.Context, client worker.FlamencoClient) {
// Get a task.
task := fetchTask(ctx, client)
if task == nil {
log.Fatal().Msg("error obtaining task, shutting down stresser")
return
}
logger := log.With().Str("task", task.Uuid).Logger()
// Mark the task as active.
err := sendTaskUpdate(ctx, client, task.Uuid, api.TaskUpdate{
Activity: ptr("Stress testing"),
TaskStatus: ptr(api.TaskStatusActive),
})
if err != nil {
logger.Warn().Err(err).Msg("Manager rejected task becoming active. Going to stress it anyway.")
}
startTime = time.Now()
go reportStatisticsLoop(ctx)
// Do the stress test.
var wait time.Duration
for {
select {
case <-ctx.Done():
log.Debug().Msg("stresser interrupted by context cancellation")
return
case <-time.After(wait):
}
increaseNumRequests()
err := stress(ctx, client, task)
if err != nil {
log.Info().Err(err).Str("task", task.Uuid).Msg("Manager rejected task update")
increaseNumFailed()
}
wait = durationWaitStress
}
}
func stress(ctx context.Context, client worker.FlamencoClient, task *api.AssignedTask) error {
logline := "This is a log-line for stress testing. It will be repeated more than once.\n"
bigLog := strings.Repeat(logline, 1000)
mutex.RLock()
update := api.TaskUpdate{
Activity: ptr(fmt.Sprintf("stress test update %v", numRequests)),
Log: &bigLog,
}
mutex.RUnlock()
return sendTaskUpdate(ctx, client, task.Uuid, update)
}
func ptr[T any](value T) *T {
return &value
}
func increaseNumRequests() {
mutex.Lock()
defer mutex.Unlock()
numRequests++
}
func increaseNumFailed() {
mutex.Lock()
defer mutex.Unlock()
numFailed++
}
func reportStatistics() {
mutex.RLock()
defer mutex.RUnlock()
duration := time.Since(startTime)
durationInSeconds := float64(duration) / float64(time.Second)
reqPerSecond := float64(numRequests) / durationInSeconds
log.Info().
Int("numRequests", numRequests).
Int("numFailed", numFailed).
Str("duration", duration.String()).
Float64("requestsPerSecond", reqPerSecond).
Msg("stress progress")
}
func reportStatisticsLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(reportPeriod):
reportStatistics()
}
}
}

View File

@ -27,9 +27,15 @@ var (
errSignOnRejected = errors.New("manager rejected our sign-on credentials") // Reached Manager, but it rejected our creds. errSignOnRejected = errors.New("manager rejected our sign-on credentials") // Reached Manager, but it rejected our creds.
) )
type WorkerConfigWithCredentials interface {
WorkerConfig() (WorkerConfig, error)
WorkerCredentials() (WorkerCredentials, error)
SaveCredentials(creds WorkerCredentials) error
}
// registerOrSignOn tries to sign on, and if that fails (or there are no credentials) tries to register. // registerOrSignOn tries to sign on, and if that fails (or there are no credentials) tries to register.
// Returns an authenticated Flamenco OpenAPI client. // Returns an authenticated Flamenco OpenAPI client.
func RegisterOrSignOn(ctx context.Context, configWrangler FileConfigWrangler) ( func RegisterOrSignOn(ctx context.Context, configWrangler WorkerConfigWithCredentials) (
client FlamencoClient, startupState api.WorkerStatus, client FlamencoClient, startupState api.WorkerStatus,
) { ) {
// Load configuration // Load configuration