Manager: protect task state machine with a mutex
Protect the public functions of the task state machine with a mutex, so that only one task/job state change is handled at a time. This should avoid race conditions.
This commit is contained in:
parent
1c50837577
commit
b845189dfc
@ -5,6 +5,7 @@ package task_state_machine
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
@ -23,6 +24,10 @@ type StateMachine struct {
|
|||||||
persist PersistenceService
|
persist PersistenceService
|
||||||
broadcaster ChangeBroadcaster
|
broadcaster ChangeBroadcaster
|
||||||
logStorage LogStorage
|
logStorage LogStorage
|
||||||
|
|
||||||
|
// mutex protects all public functions, so that only one function can run at a time.
|
||||||
|
// This is to avoid race conditions on task/job status updates.
|
||||||
|
mutex *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine {
|
func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster, logStorage LogStorage) *StateMachine {
|
||||||
@ -30,6 +35,7 @@ func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster,
|
|||||||
persist: persist,
|
persist: persist,
|
||||||
broadcaster: broadcaster,
|
broadcaster: broadcaster,
|
||||||
logStorage: logStorage,
|
logStorage: logStorage,
|
||||||
|
mutex: new(sync.Mutex),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,6 +51,9 @@ func (sm *StateMachine) TaskStatusChange(
|
|||||||
return nil // Will not run because of the panic.
|
return nil // Will not run because of the panic.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sm.mutex.Lock()
|
||||||
|
defer sm.mutex.Unlock()
|
||||||
|
|
||||||
job, err := sm.persist.FetchJobByID(ctx, task.JobID)
|
job, err := sm.persist.FetchJobByID(ctx, task.JobID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot fetch the job of task %s: %w", task.UUID, err)
|
return fmt.Errorf("cannot fetch the job of task %s: %w", task.UUID, err)
|
||||||
@ -282,6 +291,9 @@ func (sm *StateMachine) JobStatusChange(
|
|||||||
newJobStatus api.JobStatus,
|
newJobStatus api.JobStatus,
|
||||||
reason string,
|
reason string,
|
||||||
) error {
|
) error {
|
||||||
|
sm.mutex.Lock()
|
||||||
|
defer sm.mutex.Unlock()
|
||||||
|
|
||||||
job, err := sm.persist.FetchJob(ctx, jobUUID)
|
job, err := sm.persist.FetchJob(ctx, jobUUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -621,6 +633,9 @@ func (sm *StateMachine) checkTaskCompletion(
|
|||||||
// to run at startup of Flamenco Manager, and checks to see if there are any
|
// to run at startup of Flamenco Manager, and checks to see if there are any
|
||||||
// jobs in a status that a human will not be able to fix otherwise.
|
// jobs in a status that a human will not be able to fix otherwise.
|
||||||
func (sm *StateMachine) CheckStuck(ctx context.Context) {
|
func (sm *StateMachine) CheckStuck(ctx context.Context) {
|
||||||
|
sm.mutex.Lock()
|
||||||
|
defer sm.mutex.Unlock()
|
||||||
|
|
||||||
stuckJobs, err := sm.persist.FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeueing)
|
stuckJobs, err := sm.persist.FetchJobsInStatus(ctx, api.JobStatusCancelRequested, api.JobStatusRequeueing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("unable to fetch stuck jobs")
|
log.Error().Err(err).Msg("unable to fetch stuck jobs")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user