Skip to content

Commit

Permalink
Respect max_concurrency in pipeline and input steps. #815.
Browse files Browse the repository at this point in the history
  • Loading branch information
vhadianto committed Mar 27, 2024
1 parent b7b6820 commit 196517c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
_Bug fixes_

* Misleading error message for invalid step dependencies. ([#816](https://github.com/turbot/flowpipe/issues/816)).
* Respect `max_concurrency` in `pipeline` and `input` steps. ([#815](https://github.com/turbot/flowpipe/issues/815)).

## v0.4.2 [2023-03-26]

Expand Down
5 changes: 5 additions & 0 deletions internal/es/command/step_pipeline_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ func (h StepPipelineFinishHandler) Handle(ctx context.Context, c interface{}) er
}
}

err = execution.ReleasePipelineExecutionStepSemaphore(cmd.PipelineExecutionID, stepDefn)
if err != nil {
return h.EventBus.Publish(ctx, event.NewPipelineFailedFromStepPipelineFinish(cmd, err))
}

e, err := event.NewStepFinished(event.ForPipelineStepFinish(cmd))
e.StepLoop = stepLoop

Expand Down
3 changes: 3 additions & 0 deletions internal/es/command/step_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (h StepStartHandler) Handle(ctx context.Context, c interface{}) error {
if stepDefn.GetType() == schema.BlockTypePipelineStepInput {
slog.Debug("Step execution is an input step, not releasing semaphore", "step_name", cmd.StepName, "pipeline_execution_id", cmd.PipelineExecutionID)
return
} else if stepDefn.GetType() == schema.BlockTypePipelineStepPipeline {
slog.Debug("Step execution is a pipeline step, not releasing semaphore", "step_name", cmd.StepName, "pipeline_execution_id", cmd.PipelineExecutionID)
return
}

err := execution.ReleasePipelineExecutionStepSemaphore(cmd.PipelineExecutionID, stepDefn)
Expand Down
15 changes: 15 additions & 0 deletions internal/es/estest/test_suite_mod/pipelines/lots_of_sleep.fp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ pipeline "lots_of_sleep_bound" {
}
}

pipeline "lots_of_nested_pipeline" {
step "pipeline" "pipeline" {
for_each = [1, 2, 3, 4, 5, 6, 7]
max_concurrency = 2
pipeline = pipeline.nested
}
}

pipeline "nested" {
step "transform" "transform" {
value = "foo"
}
}


pipeline "lots_of_sleep_bound_with_param" {
param "concurrency" {
default = 1
Expand Down
4 changes: 4 additions & 0 deletions internal/service/api/form.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ func (api *APIService) finishInputStepFromForm(ex *execution.ExecutionInMemory,
}

err := command.EndStepFromApi(ex, stepExecution, pipelineDefn, stepDefn, &out, api.EsService.EventBus)
if err != nil {
return err
}

err = execution.ReleasePipelineExecutionStepSemaphore(stepExecution.PipelineExecutionID, stepDefn)
return err
}
5 changes: 5 additions & 0 deletions internal/service/api/integration_slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,5 +390,10 @@ func (api *APIService) finishInputStep(execId string, pExecId string, sExecId st
return false, nil, perr.InternalWithMessage(fmt.Sprintf("error raising step finished event: %s", err.Error()))
}

err = execution.ReleasePipelineExecutionStepSemaphore(stepExecution.PipelineExecutionID, stepDefn)
if err != nil {
return false, nil, perr.InternalWithMessage(fmt.Sprintf("error releasing step semaphore: %s", err.Error()))
}

return true, stepExecution, nil
}

0 comments on commit 196517c

Please sign in to comment.