From b845189dfc58bc67ba9ea799839f703e719e9881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 9 Jan 2025 15:28:02 +0100 Subject: [PATCH] Manager: protect task state machine with a mutex Protect the public functions of the task state machine with a mutex, so that only one task/job state change is handled at a time. This should avoid race conditions. --- .../task_state_machine/task_state_machine.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index b73e2149..d90d6e64 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -5,6 +5,7 @@ package task_state_machine import ( "context" "fmt" + "sync" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -23,6 +24,10 @@ type StateMachine struct { persist PersistenceService broadcaster ChangeBroadcaster logStorage LogStorage + + // mutex protects all public functions, so that only one function can run at a time. + // This is to avoid race conditions on task/job status updates. + mutex *sync.Mutex } func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine { @@ -30,6 +35,7 @@ func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, persist: persist, broadcaster: broadcaster, logStorage: logStorage, + mutex: new(sync.Mutex), } } @@ -45,6 +51,9 @@ func (sm *StateMachine) TaskStatusChange( return nil // Will not run because of the panic. } + sm.mutex.Lock() + defer sm.mutex.Unlock() + job, err := sm.persist.FetchJobByID(ctx, task.JobID) if err != nil { return fmt.Errorf("cannot fetch the job of task %s: %w", task.UUID, err) @@ -282,6 +291,9 @@ func (sm *StateMachine) JobStatusChange( newJobStatus api.JobStatus, reason string, ) error { + sm.mutex.Lock() + defer sm.mutex.Unlock() + job, err := sm.persist.FetchJob(ctx, jobUUID) if err != nil { return err @@ -621,6 +633,9 @@ func (sm *StateMachine) checkTaskCompletion( // to run at startup of Flamenco Manager, and checks to see if there are any // jobs in a status that a human will not be able to fix otherwise. func (sm *StateMachine) CheckStuck(ctx context.Context) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + stuckJobs, err := sm.persist.FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeueing) if err != nil { log.Error().Err(err).Msg("unable to fetch stuck jobs")