diff --git a/internal/worker/upstream_buffer_test.go b/internal/worker/upstream_buffer_test.go index ac4df6d8..71e76320 100644 --- a/internal/worker/upstream_buffer_test.go +++ b/internal/worker/upstream_buffer_test.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "github.com/benbjohnson/clock" @@ -83,17 +84,20 @@ func TestUpstreamBufferManagerUnavailable(t *testing.T) { assert.Equal(t, 1, queueSize) // Wait for the flushing with Manager available. - resp := &api.TaskUpdateResponse{} + wg := sync.WaitGroup{} + wg.Add(1) mocks.client.EXPECT(). TaskUpdateWithResponse(ctx, taskID, update). - Return(resp, nil). + DoAndReturn(func(ctx context.Context, taskID string, body api.TaskUpdateJSONRequestBody, editors ...api.RequestEditorFn) (*api.TaskUpdateResponse, error) { + wg.Done() + return &api.TaskUpdateResponse{}, nil + }). After(managerCallFail) - // Only add exactly the flush interval, as that maximises the chances of - // getting conflicts on the database level (if we didn't have the - // database-protection mutex). mocks.clock.Add(defaultUpstreamFlushInterval) + wg.Wait() + // Queue should be empty now. ub.dbMutex.Lock() queueSize, err = ub.queueSize()