Stresser: stress the Manager by querying for tasks to execute

This commit is contained in:
Sybren A. Stüvel 2022-07-15 14:36:23 +02:00
parent 26f07d85fd
commit 904b6c0d73
2 changed files with 26 additions and 16 deletions

View File

@ -62,10 +62,6 @@ func fetchTask(ctx context.Context, client worker.FlamencoClient) *api.AssignedT
} }
switch { switch {
case resp.JSON200 != nil: case resp.JSON200 != nil:
log.Info().
Str("job", resp.JSON200.Job).
Str("task", resp.JSON200.Uuid).
Msg("obtained task")
return resp.JSON200 return resp.JSON200
case resp.JSON423 != nil: case resp.JSON423 != nil:
log.Fatal().Str("requestedStatus", string(resp.JSON423.StatusRequested)). log.Fatal().Str("requestedStatus", string(resp.JSON423.StatusRequested)).

View File

@ -14,7 +14,7 @@ import (
const ( const (
// For the actual stress test. // For the actual stress test.
durationWaitStress = 500 * time.Millisecond durationWaitStress = 0 * time.Millisecond
reportPeriod = 2 * time.Second reportPeriod = 2 * time.Second
) )
@ -34,6 +34,9 @@ func Run(ctx context.Context, client worker.FlamencoClient) {
return return
} }
logger := log.With().Str("task", task.Uuid).Logger() logger := log.With().Str("task", task.Uuid).Logger()
logger.Info().
Str("job", task.Job).
Msg("obtained task")
// Mark the task as active. // Mark the task as active.
err := sendTaskUpdate(ctx, client, task.Uuid, api.TaskUpdate{ err := sendTaskUpdate(ctx, client, task.Uuid, api.TaskUpdate{
@ -58,29 +61,40 @@ func Run(ctx context.Context, client worker.FlamencoClient) {
case <-time.After(wait): case <-time.After(wait):
} }
increaseNumRequests() // stressBySendingTaskUpdate(ctx, client, task)
err := stress(ctx, client, task) stressByRequestingTask(ctx, client)
if err != nil {
log.Info().Err(err).Str("task", task.Uuid).Msg("Manager rejected task update")
increaseNumFailed()
}
wait = durationWaitStress wait = durationWaitStress
} }
} }
func stress(ctx context.Context, client worker.FlamencoClient, task *api.AssignedTask) error { func stressByRequestingTask(ctx context.Context, client worker.FlamencoClient) {
logline := "This is a log-line for stress testing. It will be repeated more than once.\n" increaseNumRequests()
bigLog := strings.Repeat(logline, 1000) task := fetchTask(ctx, client)
if task == nil {
increaseNumFailed()
log.Info().Msg("error obtaining task")
}
}
func stressBySendingTaskUpdate(ctx context.Context, client worker.FlamencoClient, task *api.AssignedTask) {
logLine := "This is a log-line for stress testing. It will be repeated more than once.\n"
logToSend := strings.Repeat(logLine, 5)
increaseNumRequests()
mutex.RLock() mutex.RLock()
update := api.TaskUpdate{ update := api.TaskUpdate{
Activity: ptr(fmt.Sprintf("stress test update %v", numRequests)), Activity: ptr(fmt.Sprintf("stress test update %v", numRequests)),
Log: &bigLog, Log: &logToSend,
} }
mutex.RUnlock() mutex.RUnlock()
return sendTaskUpdate(ctx, client, task.Uuid, update) err := sendTaskUpdate(ctx, client, task.Uuid, update)
if err != nil {
log.Info().Err(err).Str("task", task.Uuid).Msg("Manager rejected task update")
increaseNumFailed()
}
} }
func ptr[T any](value T) *T { func ptr[T any](value T) *T {