//go:build mage package main import ( "context" "fmt" "sync" "time" "golang.org/x/sync/errgroup" ) // ParallelBuilder manages parallel execution of build tasks type ParallelBuilder struct { maxConcurrency int progress *ProgressReporter } // BuildTask represents a build task that can be executed in parallel type BuildTask struct { Name string Dependencies []string Function func() error StartTime time.Time EndTime time.Time Duration time.Duration Error error } // ProgressReporter tracks and reports build progress type ProgressReporter struct { mu sync.Mutex tasks map[string]*BuildTask completed int total int startTime time.Time } // NewParallelBuilder creates a new parallel builder with specified concurrency func NewParallelBuilder(maxConcurrency int) *ParallelBuilder { return &ParallelBuilder{ maxConcurrency: maxConcurrency, progress: NewProgressReporter(), } } // NewProgressReporter creates a new progress reporter func NewProgressReporter() *ProgressReporter { return &ProgressReporter{ tasks: make(map[string]*BuildTask), startTime: time.Now(), } } // ExecuteParallel executes build tasks in parallel while respecting dependencies func (pb *ParallelBuilder) ExecuteParallel(ctx context.Context, tasks []*BuildTask) error { if len(tasks) == 0 { return nil } pb.progress.SetTotal(len(tasks)) fmt.Printf("Parallel: Starting build with %d tasks (max concurrency: %d)\n", len(tasks), pb.maxConcurrency) // Build dependency graph (for future use) _ = pb.buildDependencyGraph(tasks) // Find tasks that can run immediately (no dependencies) readyTasks := pb.findReadyTasks(tasks, make(map[string]bool)) // Track completed tasks completed := make(map[string]bool) // Create error group with concurrency limit g, ctx := errgroup.WithContext(ctx) g.SetLimit(pb.maxConcurrency) // Channel to communicate task completions completedTasks := make(chan string, len(tasks)) // Execute tasks in waves based on dependencies for len(completed) < len(tasks) { if len(readyTasks) == 0 { // Wait for some tasks to complete to unlock new ones select { case taskName := <-completedTasks: completed[taskName] = true pb.progress.MarkCompleted(taskName) // Find newly ready tasks newReadyTasks := pb.findReadyTasks(tasks, completed) for _, task := range newReadyTasks { if !pb.isTaskInSlice(task, readyTasks) { readyTasks = append(readyTasks, task) } } case <-ctx.Done(): return ctx.Err() } continue } // Launch all ready tasks currentWave := make([]*BuildTask, len(readyTasks)) copy(currentWave, readyTasks) readyTasks = readyTasks[:0] // Clear ready tasks for _, task := range currentWave { task := task // Capture loop variable if completed[task.Name] { continue // Skip already completed tasks } g.Go(func() error { pb.progress.StartTask(task.Name) task.StartTime = time.Now() err := task.Function() task.EndTime = time.Now() task.Duration = task.EndTime.Sub(task.StartTime) task.Error = err if err != nil { pb.progress.FailTask(task.Name, err) return fmt.Errorf("task %s failed: %w", task.Name, err) } // Notify completion select { case completedTasks <- task.Name: case <-ctx.Done(): return ctx.Err() } return nil }) } } // Wait for all tasks to complete if err := g.Wait(); err != nil { return err } pb.progress.Finish() return nil } // buildDependencyGraph creates a map of task dependencies func (pb *ParallelBuilder) buildDependencyGraph(tasks []*BuildTask) map[string][]string { graph := make(map[string][]string) for _, task := range tasks { graph[task.Name] = task.Dependencies } return graph } // findReadyTasks finds tasks that have all their dependencies completed func (pb *ParallelBuilder) findReadyTasks(tasks []*BuildTask, completed map[string]bool) []*BuildTask { var ready []*BuildTask for _, task := range tasks { if completed[task.Name] { continue // Already completed } allDepsCompleted := true for _, dep := range task.Dependencies { if !completed[dep] { allDepsCompleted = false break } } if allDepsCompleted { ready = append(ready, task) } } return ready } // isTaskInSlice checks if a task is already in a slice func (pb *ParallelBuilder) isTaskInSlice(task *BuildTask, slice []*BuildTask) bool { for _, t := range slice { if t.Name == task.Name { return true } } return false } // SetTotal sets the total number of tasks for progress reporting func (pr *ProgressReporter) SetTotal(total int) { pr.mu.Lock() defer pr.mu.Unlock() pr.total = total } // StartTask marks a task as started func (pr *ProgressReporter) StartTask(taskName string) { pr.mu.Lock() defer pr.mu.Unlock() if task, exists := pr.tasks[taskName]; exists { task.StartTime = time.Now() } else { pr.tasks[taskName] = &BuildTask{ Name: taskName, StartTime: time.Now(), } } fmt.Printf("Parallel: [%d/%d] Starting %s\n", pr.completed+1, pr.total, taskName) } // MarkCompleted marks a task as completed successfully func (pr *ProgressReporter) MarkCompleted(taskName string) { pr.mu.Lock() defer pr.mu.Unlock() if task, exists := pr.tasks[taskName]; exists { task.EndTime = time.Now() task.Duration = task.EndTime.Sub(task.StartTime) } pr.completed++ elapsed := time.Since(pr.startTime) fmt.Printf("Parallel: [%d/%d] Completed %s (%.2fs, total elapsed: %.2fs)\n", pr.completed, pr.total, taskName, pr.tasks[taskName].Duration.Seconds(), elapsed.Seconds()) } // FailTask marks a task as failed func (pr *ProgressReporter) FailTask(taskName string, err error) { pr.mu.Lock() defer pr.mu.Unlock() if task, exists := pr.tasks[taskName]; exists { task.EndTime = time.Now() task.Duration = task.EndTime.Sub(task.StartTime) task.Error = err } elapsed := time.Since(pr.startTime) fmt.Printf("Parallel: [FAILED] %s after %.2fs (total elapsed: %.2fs): %v\n", taskName, pr.tasks[taskName].Duration.Seconds(), elapsed.Seconds(), err) } // Finish completes the progress reporting and shows final statistics func (pr *ProgressReporter) Finish() { pr.mu.Lock() defer pr.mu.Unlock() totalElapsed := time.Since(pr.startTime) fmt.Printf("Parallel: Build completed in %.2fs\n", totalElapsed.Seconds()) // Show task timing breakdown if verbose if len(pr.tasks) > 1 { fmt.Printf("Parallel: Task timing breakdown:\n") var totalTaskTime time.Duration for name, task := range pr.tasks { fmt.Printf(" %s: %.2fs\n", name, task.Duration.Seconds()) totalTaskTime += task.Duration } parallelEfficiency := (totalTaskTime.Seconds() / totalElapsed.Seconds()) * 100 fmt.Printf("Parallel: Parallel efficiency: %.1f%% (%.2fs total task time)\n", parallelEfficiency, totalTaskTime.Seconds()) } } // ExecuteSequential is a helper to execute tasks sequentially using the parallel infrastructure func (pb *ParallelBuilder) ExecuteSequential(ctx context.Context, tasks []*BuildTask) error { oldConcurrency := pb.maxConcurrency pb.maxConcurrency = 1 defer func() { pb.maxConcurrency = oldConcurrency }() return pb.ExecuteParallel(ctx, tasks) } // Common build task creators for Flamenco // CreateGenerateTask creates a task for code generation func CreateGenerateTask(name string, deps []string, fn func() error) *BuildTask { return &BuildTask{ Name: name, Dependencies: deps, Function: fn, } } // CreateBuildTask creates a task for building binaries func CreateBuildTask(name string, deps []string, fn func() error) *BuildTask { return &BuildTask{ Name: name, Dependencies: deps, Function: fn, } } // CreateWebappTask creates a task for webapp building func CreateWebappTask(name string, deps []string, fn func() error) *BuildTask { return &BuildTask{ Name: name, Dependencies: deps, Function: fn, } } // WarmBuildCache pre-warms the build cache by analyzing current state func WarmBuildCache(cache *BuildCache) error { fmt.Println("Parallel: Warming build cache...") // Common source patterns for Flamenco commonSources := []struct { target string sources []string outputs []string }{ { target: "go-sources", sources: []string{"**/*.go", "go.mod", "go.sum"}, outputs: []string{}, // No direct outputs, just tracking }, { target: "webapp-sources", sources: []string{"web/app/**/*.ts", "web/app/**/*.vue", "web/app/**/*.js", "web/app/package.json", "web/app/yarn.lock"}, outputs: []string{}, // No direct outputs, just tracking }, { target: "openapi-spec", sources: []string{"pkg/api/flamenco-openapi.yaml"}, outputs: []string{}, // No direct outputs, just tracking }, } for _, source := range commonSources { if needsBuild, err := cache.NeedsBuild(source.target, source.sources, []string{}, source.outputs); err != nil { return err } else if !needsBuild { fmt.Printf("Cache: %s is up to date\n", source.target) } } return nil }