Sybren A. Stüvel a9bec98fcd Fix linter warnings
Fix most linter warnings reported by 'staticcheck'. This doesn't fix all
of them, some unused functions are still there, and some generated code
also still triggers some warnings. Most issues are fixed, though.

No functional changes, except for the captialisation of some error
messages.
2024-12-01 14:49:25 +01:00

145 lines
3.1 KiB
Go

package stresser
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/worker"
"projects.blender.org/studio/flamenco/pkg/api"
)
const (
// For the actual stress test.
durationWaitStress = 0 * time.Millisecond
reportPeriod = 2 * time.Second
)
var (
numRequests = 0
numFailed = 0
startTime time.Time
mutex = sync.RWMutex{}
)
func Run(ctx context.Context, client worker.FlamencoClient) {
// Get a task.
task := fetchTask(ctx, client)
if task == nil {
log.Fatal().Msg("error obtaining task, shutting down stresser")
return
}
logger := log.With().Str("task", task.Uuid).Logger()
logger.Info().
Str("job", task.Job).
Msg("obtained task")
// Mark the task as active.
err := sendTaskUpdate(ctx, client, task.Uuid, api.TaskUpdate{
Activity: ptr("Stress testing"),
TaskStatus: ptr(api.TaskStatusActive),
})
if err != nil {
logger.Warn().Err(err).Msg("Manager rejected task becoming active. Going to stress it anyway.")
}
startTime = time.Now()
go reportStatisticsLoop(ctx)
// Do the stress test.
var wait time.Duration
for {
select {
case <-ctx.Done():
log.Debug().Msg("stresser interrupted by context cancellation")
return
case <-time.After(wait):
}
// stressBySendingTaskUpdate(ctx, client, task)
stressByRequestingTask(ctx, client)
wait = durationWaitStress
}
}
func stressByRequestingTask(ctx context.Context, client worker.FlamencoClient) {
increaseNumRequests()
task := fetchTask(ctx, client)
if task == nil {
increaseNumFailed()
log.Info().Msg("error obtaining task")
}
}
//lint:ignore U1000 stressBySendingTaskUpdate is currently unused, but someone may find it useful for different kinds of stess testing.
func stressBySendingTaskUpdate(ctx context.Context, client worker.FlamencoClient, task *api.AssignedTask) {
logLine := "This is a log-line for stress testing. It will be repeated more than once.\n"
logToSend := strings.Repeat(logLine, 5)
increaseNumRequests()
mutex.RLock()
update := api.TaskUpdate{
Activity: ptr(fmt.Sprintf("stress test update %v", numRequests)),
Log: &logToSend,
}
mutex.RUnlock()
err := sendTaskUpdate(ctx, client, task.Uuid, update)
if err != nil {
log.Info().Err(err).Str("task", task.Uuid).Msg("Manager rejected task update")
increaseNumFailed()
}
}
func ptr[T any](value T) *T {
return &value
}
func increaseNumRequests() {
mutex.Lock()
defer mutex.Unlock()
numRequests++
}
func increaseNumFailed() {
mutex.Lock()
defer mutex.Unlock()
numFailed++
}
func reportStatistics() {
mutex.RLock()
defer mutex.RUnlock()
duration := time.Since(startTime)
durationInSeconds := float64(duration) / float64(time.Second)
reqPerSecond := float64(numRequests) / durationInSeconds
log.Info().
Int("numRequests", numRequests).
Int("numFailed", numFailed).
Str("duration", duration.String()).
Float64("requestsPerSecond", reqPerSecond).
Msg("stress progress")
}
func reportStatisticsLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(reportPeriod):
reportStatistics()
}
}
}