diff --git a/internal/worker/listener.go b/internal/worker/listener.go index 5f030508..26c325ac 100644 --- a/internal/worker/listener.go +++ b/internal/worker/listener.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "strings" "sync" "time" @@ -77,18 +78,51 @@ func (l *Listener) Wait() { l.doneWg.Wait() } +func ptr[T any](value T) *T { + return &value +} + // TaskStarted tells the Manager that task execution has started. func (l *Listener) TaskStarted(ctx context.Context, taskID string) error { - activity := "Started" - status := api.TaskStatusActive - update := api.TaskUpdateJSONRequestBody{ - Activity: &activity, - TaskStatus: &status, - } + return l.sendTaskUpdate(ctx, taskID, api.TaskUpdateJSONRequestBody{ + Activity: ptr("Started"), + TaskStatus: ptr(api.TaskStatusActive), + }) +} +// TaskFailed tells the Manager the task failed for some reason. +func (l *Listener) TaskFailed(ctx context.Context, taskID string, reason string) error { + return l.sendTaskUpdate(ctx, taskID, api.TaskUpdateJSONRequestBody{ + Activity: ptr(fmt.Sprintf("Failed: %v", reason)), + TaskStatus: ptr(api.TaskStatusFailed), + }) +} + +// TaskCompleted tells the Manager the task has been completed. +func (l *Listener) TaskCompleted(ctx context.Context, taskID string) error { + return l.sendTaskUpdate(ctx, taskID, api.TaskUpdateJSONRequestBody{ + Activity: ptr("Completed"), + TaskStatus: ptr(api.TaskStatusCompleted), + }) +} + +// LogProduced sends any logging to whatever service for storing logging. +func (l *Listener) LogProduced(ctx context.Context, taskID string, logLines ...string) error { + return l.sendTaskUpdate(ctx, taskID, api.TaskUpdateJSONRequestBody{ + Log: ptr(strings.Join(logLines, "\n")), + }) +} + +// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). +func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error { + // TODO: implement + return nil +} + +func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error { resp, err := l.client.TaskUpdateWithResponse(ctx, string(taskID), update) if err != nil { - return fmt.Errorf("error notifying Manager of task start: %w", err) + return fmt.Errorf("error sending task update: %w", err) } switch resp.StatusCode() { @@ -100,23 +134,3 @@ func (l *Listener) TaskStarted(ctx context.Context, taskID string) error { return fmt.Errorf("unknown error from Manager, code %d: %v", resp.StatusCode(), resp.JSONDefault) } } - -// TaskFailed tells the Manager the task failed for some reason. -func (l *Listener) TaskFailed(ctx context.Context, taskID string, reason string) error { - return nil -} - -// TaskCompleted tells the Manager the task has been completed. -func (l *Listener) TaskCompleted(ctx context.Context, taskID string) error { - return nil -} - -// LogProduced sends any logging to whatever service for storing logging. -func (l *Listener) LogProduced(ctx context.Context, taskID string, logLines ...string) error { - return nil -} - -// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). -func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLocation string) error { - return nil -}