Skip to content

Commit

Permalink
Add support to configure the args attribute in the loop block of …
Browse files Browse the repository at this point in the history
…a pipeline step. Closes #559 (#575)
  • Loading branch information
Subhajit97 authored Jan 12, 2024
1 parent 6dddf51 commit f165384
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
68 changes: 68 additions & 0 deletions internal/es/estest/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3606,6 +3606,74 @@ func (suite *ModTestSuite) TestSqliteQueryTimeout() {
assert.Contains(pex.Errors[0].Error.Error(), "Timeout: Query execution exceeded timeout")
}

func (suite *ModTestSuite) TestPipelineStepLoopWithArgs() {
assert := assert.New(suite.T())

pipelineInput := modconfig.Input{}

// The loop block has result.request_body which is an argument, we were only adding the output attributes rather than the argument attributes
_, pipelineCmd, err := runPipeline(suite.FlowpipeTestSuite, "test_suite_mod.pipeline.simple_pipeline_loop_with_args", 100*time.Millisecond, pipelineInput)

if err != nil {
assert.Fail("Error creating execution", err)
return
}

_, pex, _ := getPipelineExAndWait(suite.FlowpipeTestSuite, pipelineCmd.Event, pipelineCmd.PipelineExecutionID, 100*time.Millisecond, 40, "failed")
assert.Equal("finished", pex.Status)
assert.Equal(4, len(pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions))

// Iteration 0
output := pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions[0].Output.Data["output"].(map[string]interface{})
assert.Equal("Hello world! iteration index 0", output["greet_world"].(string))

// Iteration 1
output = pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions[1].Output.Data["output"].(map[string]interface{})
assert.Equal("Hello world! loop index_0 0", output["greet_world"].(string))

// Iteration 2
output = pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions[2].Output.Data["output"].(map[string]interface{})
assert.Equal("Hello world! loop index_1 1", output["greet_world"].(string))

// Iteration 3
output = pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions[3].Output.Data["output"].(map[string]interface{})
assert.Equal("Hello world! loop index_2 2", output["greet_world"].(string))
}

func (suite *ModTestSuite) TestPipelineStepLoopWithArgsLiteral() {
assert := assert.New(suite.T())

pipelineInput := modconfig.Input{}

// The loop block has result.request_body which is an argument, we were only adding the output attributes rather than the argument attributes
_, pipelineCmd, err := runPipeline(suite.FlowpipeTestSuite, "test_suite_mod.pipeline.simple_pipeline_loop_with_arg_literal", 100*time.Millisecond, pipelineInput)

if err != nil {
assert.Fail("Error creating execution", err)
return
}

_, pex, _ := getPipelineExAndWait(suite.FlowpipeTestSuite, pipelineCmd.Event, pipelineCmd.PipelineExecutionID, 100*time.Millisecond, 40, "failed")
assert.Equal("finished", pex.Status)
assert.Equal(4, len(pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions))

// Iteration 0
output := pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions[0].Output.Data["output"].(map[string]interface{})
assert.Equal("Hello world! iteration index 0", output["greet_world"].(string))

// Iteration 1
output = pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions[1].Output.Data["output"].(map[string]interface{})
assert.Equal("Hello world! loop index 1", output["greet_world"].(string))

// Iteration 2
output = pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions[2].Output.Data["output"].(map[string]interface{})
assert.Equal("Hello world! loop index 1", output["greet_world"].(string))

// Iteration 3
output = pex.StepStatus["pipeline.repeat_pipeline_loop_test"]["0"].StepExecutions[3].Output.Data["output"].(map[string]interface{})
assert.Equal("Hello world! loop index 1", output["greet_world"].(string))
}

func TestModTestingSuite(t *testing.T) {
suite.Run(t, &ModTestSuite{
FlowpipeTestSuite: &FlowpipeTestSuite{},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
pipeline "pipeline_loop_test" {

param "message" {
type = string
}

param "index" {
type = number
}

output "greet_world" {
value = "Hello world! ${param.message} ${param.index}"
}
}

pipeline "simple_pipeline_loop_with_args" {

param "test_message" {
type = string
default = "loop index"
}

step "pipeline" "repeat_pipeline_loop_test" {
pipeline = pipeline.pipeline_loop_test
args = {
message = "iteration index"
index = 0
}

loop {
until = loop.index > 2
args = {
message = "${param.test_message}_${loop.index}"
index = loop.index
}
}
}

output "value" {
value = step.pipeline.repeat_pipeline_loop_test
}
}

pipeline "simple_pipeline_loop_with_arg_literal" {

step "pipeline" "repeat_pipeline_loop_test" {
pipeline = pipeline.pipeline_loop_test
args = {
message = "iteration index"
index = 0
}

loop {
until = loop.index > 2
args = {
message = "loop index"
index = 1
}
}
}

output "value" {
value = step.pipeline.repeat_pipeline_loop_test
}
}

0 comments on commit f165384

Please sign in to comment.