From 2cef977324c9b78508de72b0eb49c6eff88346c0 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 25 Oct 2023 13:35:29 -0700 Subject: [PATCH 1/6] Adjust padding to accommodate good enough headers and don't include partial data frame while computing crc32c value --- .../internal/stream/KafkaClientProduceFactory.java | 9 +++++++-- .../kafka/internal/stream/ClientMergedIT.java | 6 +++--- .../produce/message.value.100k/client.rpt | 2 +- .../produce/message.value.100k/server.rpt | 4 ++-- .../application/produce/message.value.10k/client.rpt | 2 +- .../application/produce/message.value.10k/server.rpt | 4 ++-- .../produce/message.values.sequential/client.rpt | 2 +- .../produce/message.values.sequential/server.rpt | 2 +- .../produce.v3/message.values.sequential/client.rpt | 12 ++++++------ .../produce.v3/message.values.sequential/server.rpt | 12 ++++++------ 10 files changed, 30 insertions(+), 25 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index d6c2cb2b5b..9690473ba5 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -86,7 +86,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i { private static final int PRODUCE_REQUEST_RECORDS_OFFSET_MAX = 512; - private static final int KAFKA_RECORD_FRAMING = 100; // TODO + private static final int KAFKA_RECORD_FRAMING = 512; // TODO private static final int FLAGS_CON = 0x00; private static final int FLAGS_FIN = 0x01; @@ -539,6 +539,7 @@ private int flushRecordInit( final int valueSize = payload != null ? payload.sizeof() : 0; client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred; + final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + KAFKA_RECORD_FRAMING; if (client.encodeSlot != NO_SLOT && maxEncodeableBytes > encodePool.slotCapacity()) @@ -1191,6 +1192,7 @@ private final class KafkaProduceClient extends KafkaSaslClient private int encodeableRecordCount; private int encodeableRecordBytes; private int encodeableRecordBytesDeferred; + private int encodeableRecordValueBytes; private int flushableRequestBytes; private int decodeSlot = NO_SLOT; @@ -1652,6 +1654,7 @@ private void doEncodeRecordInit( encodeSlotBuffer.putBytes(encodeSlotLimit, encodeBuffer, 0, encodeProgress); encodeSlotLimit += encodeProgress; + encodeableRecordValueBytes = 0; if (headersCount > 0) { @@ -1689,6 +1692,7 @@ private void doEncodeRecordCont( encodeSlotBuffer.putBytes(encodeSlotLimit, value.buffer(), value.offset(), length); encodeSlotLimit += length; + encodeableRecordValueBytes += length; if ((flags & FLAGS_FIN) == 0) { @@ -1893,7 +1897,8 @@ private void doEncodeProduceRequest( final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot); final int encodeSlotBytePosition = encodeSlotByteBuffer.position(); - encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit); + final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0; + encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize); encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit); final CRC32C crc = crc32c; diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java index 8ae2d6fd51..7803a7ce1f 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java @@ -49,7 +49,7 @@ public class ClientMergedIT .countersBufferCapacity(8192) .configure(ENGINE_BUFFER_SLOT_CAPACITY, 8192) .configure(KAFKA_CLIENT_META_MAX_AGE_MILLIS, 1000) - .configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 116) + .configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 528) .configurationRoot("io/aklivity/zilla/specs/binding/kafka/config") .external("net0") .clean(); @@ -234,7 +234,7 @@ public void shouldProduceMergedMessageValues() throws Exception @Configure( name = "zilla.binding.kafka.client.produce.max.bytes", value = "200000") - @ScriptProperty("padding ${512 + 100}") + @ScriptProperty("padding ${512 + 512}") public void shouldProduceMergedMessageValue10k() throws Exception { k3po.finish(); @@ -248,7 +248,7 @@ public void shouldProduceMergedMessageValue10k() throws Exception @Configure( name = "zilla.binding.kafka.client.produce.max.bytes", value = "200000") - @ScriptProperty("padding ${512 + 100}") + @ScriptProperty("padding ${512 + 512}") public void shouldProduceMergedMessageValue100k() throws Exception { k3po.finish(); diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt index b21502d3b5..cd5237ac4a 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx() write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(102400 - 8192 + 512 + 100) + .deferred(102400 - 8192 + 512 + 512) .timestamp(newTimestamp) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt index cf93c76e87..6cd3a8d747 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0" accept ${serverAddress} option zilla:window 8192 - option zilla:padding 612 + option zilla:padding 1024 option zilla:transmission "half-duplex" accepted @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx() read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(102400 - 8192 + 512 + 100) + .deferred(102400 - 8192 + 512 + 512) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt index dcd8f9d0de..b054e49965 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx() write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(10240 - 8192 + 512 + 100) + .deferred(10240 - 8192 + 512 + 512) .timestamp(newTimestamp) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt index c248b9e910..6b3eaeb77b 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0" accept ${serverAddress} option zilla:window 8192 - option zilla:padding 612 + option zilla:padding 1024 option zilla:transmission "half-duplex" accepted @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx() read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(10240 - 8192 + 512 + 100) + .deferred(10240 - 8192 + 512 + 512) .build() .build()} read zilla:data.ext ${kafka:matchDataEx() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt index 8ddf6911ff..5b8050c2df 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt @@ -81,7 +81,7 @@ write zilla:data.ext ${kafka:dataEx() .produce() .build() .build()} -write ${kafka:randomBytes(7580)} +write ${kafka:randomBytes(8192-(512+512))} write flush write zilla:data.ext ${kafka:dataEx() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt index b9d3f4903c..5a2d505c1c 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt @@ -79,7 +79,7 @@ read zilla:data.ext ${kafka:matchDataEx() .produce() .build() .build()} -read [0..7580] +read [0..7168] read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt index 6e967c671d..9b48621d87 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt @@ -78,7 +78,7 @@ write zilla:begin.ext ${proxy:beginEx() connected -write 7690 +write 7278 0s 3s ${newRequestId} @@ -90,9 +90,9 @@ write 7690 4s "test" 1 0 - 7650 # record set size + 7238 # record set size 0L # first offset - 7638 # length + 7226 # length -1 [0x02] 0x4e8723aa @@ -104,13 +104,13 @@ write 7690 -1s -1 1 # records - ${kafka:varint(7587)} + ${kafka:varint(7175)} [0x00] ${kafka:varint(0)} ${kafka:varint(0)} ${kafka:varint(-1)} # key - ${kafka:varint(7580)} # value - ${kafka:randomBytes(7580)} + ${kafka:varint(7168)} # value + ${kafka:randomBytes(7168)} ${kafka:varint(0)} # headers read 44 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt index 0c2dae01c2..a6aaebb5a4 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt @@ -74,7 +74,7 @@ read zilla:begin.ext ${proxy:beginEx() connected -read 7690 +read 7278 0s 3s (int:requestId) @@ -86,9 +86,9 @@ read 7690 4s "test" 1 0 - 7650 # record set size + 7238 # record set size 0L # first offset - 7638 # length + 7226 # length -1 [0x02] [0..4] @@ -100,13 +100,13 @@ read 7690 -1s -1 1 # records - ${kafka:varint(7587)} + ${kafka:varint(7175)} [0x00] ${kafka:varint(0)} ${kafka:varint(0)} ${kafka:varint(-1)} # key - ${kafka:varint(7580)} # value - [0..7580] + ${kafka:varint(7168)} # value + [0..7168] ${kafka:varint(0)} # headers write 44 From 0fe8153963527e9ec3d1e65dd963fe63ec94d438 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 19 Jan 2024 00:16:26 -0800 Subject: [PATCH 2/6] Support expression for primitive type in json schema --- .../engine/config/EngineConfigReader.java | 184 ++++++++++++++++-- .../internal/registry/EngineManager.java | 6 +- .../specs/engine/schema/engine.schema.json | 4 + 3 files changed, 176 insertions(+), 18 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java index b797007aaa..a88d650443 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java @@ -19,6 +19,7 @@ import static java.util.Collections.singletonMap; import static org.agrona.LangUtil.rethrowUnchecked; +import java.io.IOException; import java.io.InputStream; import java.io.Reader; import java.io.StringReader; @@ -29,10 +30,14 @@ import java.util.List; import java.util.function.Consumer; +import jakarta.json.Json; import jakarta.json.JsonArray; +import jakarta.json.JsonArrayBuilder; import jakarta.json.JsonObject; +import jakarta.json.JsonObjectBuilder; import jakarta.json.JsonPatch; import jakarta.json.JsonReader; +import jakarta.json.JsonValue; import jakarta.json.bind.Jsonb; import jakarta.json.bind.JsonbBuilder; import jakarta.json.bind.JsonbConfig; @@ -46,21 +51,25 @@ import org.leadpony.justify.api.ProblemHandler; import io.aklivity.zilla.runtime.engine.Engine; +import io.aklivity.zilla.runtime.engine.expression.ExpressionResolver; import io.aklivity.zilla.runtime.engine.internal.config.NamespaceAdapter; import io.aklivity.zilla.runtime.engine.internal.config.schema.UniquePropertyKeysSchema; public final class EngineConfigReader { private final ConfigAdapterContext context; + private final ExpressionResolver expressions; private final Collection schemaTypes; private final Consumer logger; public EngineConfigReader( ConfigAdapterContext context, + ExpressionResolver expressions, Collection schemaTypes, Consumer logger) { this.context = context; + this.expressions = expressions; this.schemaTypes = schemaTypes; this.logger = logger; } @@ -91,18 +100,13 @@ public EngineConfig read( schemaObject = schemaPatch.apply(schemaObject); } - if (logger != null) + if (!validateAnnotatedSchema(schemaObject, schemaProvider, errors, configText)) { - final StringWriter out = new StringWriter(); - schemaProvider.createGeneratorFactory(singletonMap(PRETTY_PRINTING, true)) - .createGenerator(out) - .write(schemaObject) - .close(); - - final String schemaText = out.getBuffer().toString(); - logger.accept(schemaText); + break read; } + configText = expressions.resolve(configText); + JsonParser schemaParser = schemaProvider.createParserFactory(null) .createParser(new StringReader(schemaObject.toString())); @@ -139,11 +143,11 @@ public EngineConfig read( } JsonbConfig config = new JsonbConfig() - .withAdapters(new NamespaceAdapter(context)); + .withAdapters(new NamespaceAdapter(context)); Jsonb jsonb = JsonbBuilder.newBuilder() - .withProvider(provider) - .withConfig(config) - .build(); + .withProvider(provider) + .withConfig(config) + .build(); Reader reader = new StringReader(readable); EngineConfigBuilder builder = EngineConfig.builder(); @@ -174,4 +178,158 @@ public EngineConfig read( return engine; } + + private boolean validateAnnotatedSchema( + JsonObject schemaObject, + JsonProvider schemaProvider, + List errors, + String configText) + { + boolean valid = false; + + validate: + try + { + final JsonObject annotatedSchemaObject = (JsonObject) annotateJsonObject(schemaObject); + + if (logger != null) + { + final StringWriter out = new StringWriter(); + schemaProvider.createGeneratorFactory(singletonMap(PRETTY_PRINTING, true)) + .createGenerator(out) + .write(annotatedSchemaObject) + .close(); + + final String schemaText = out.getBuffer().toString(); + logger.accept(schemaText); + } + + final JsonParser schemaParser = schemaProvider.createParserFactory(null) + .createParser(new StringReader(schemaObject.toString())); + + final JsonValidationService service = JsonValidationService.newInstance(); + ProblemHandler handler = service.createProblemPrinter(msg -> errors.add(new ConfigException(msg))); + final JsonSchemaReader validator = service.createSchemaReader(schemaParser); + final JsonSchema schema = new UniquePropertyKeysSchema(validator.read()); + + String readable = configText.stripTrailing(); + + IntArrayList configsAt = new IntArrayList(); + for (int configAt = 0; configAt < readable.length(); ) + { + configsAt.addInt(configAt); + + Reader reader = new StringReader(readable); + reader.skip(configAt); + + try (JsonParser parser = service.createParser(reader, schema, handler)) + { + while (parser.hasNext()) + { + parser.next(); + } + + configAt += (int) parser.getLocation().getStreamOffset(); + } + + if (!errors.isEmpty()) + { + break validate; + } + } + + valid = true; + } + catch (IOException ex) + { + errors.add(ex); + } + + return valid; + + } + + private JsonValue annotateJsonObject( + JsonObject jsonObject) + { + JsonObjectBuilder builder = Json.createObjectBuilder(); + + jsonObject.forEach((key, value) -> + { + if ("expression".equals(key)) + { + builder.add(key, value); + } + else if (value.getValueType() == JsonValue.ValueType.OBJECT) + { + builder.add(key, annotateJsonObject(value.asJsonObject())); + } + else if (value.getValueType() == JsonValue.ValueType.ARRAY) + { + builder.add(key, annotateJsonArray(value.asJsonArray())); + } + else if (key.equals("type") && + isPrimitiveType(value.toString().replaceAll("\"", ""))) + { + JsonValue pattern = jsonObject.get("pattern"); + builder.add(key, value); + builder.add("oneOf", createOneOfTypes(value.toString().replaceAll("\"", ""), pattern)); + } + else if (!"pattern".equals(key)) + { + builder.add(key, value); + } + }); + + return builder.build(); + } + + private JsonValue annotateJsonArray( + JsonArray jsonArray) + { + JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); + + jsonArray.forEach(item -> + { + if (item.getValueType() == JsonValue.ValueType.OBJECT) + { + arrayBuilder.add(annotateJsonObject(item.asJsonObject())); + } + else + { + arrayBuilder.add(item); + } + }); + + return arrayBuilder.build(); + } + + private boolean isPrimitiveType( + String type) + { + return "string".equals(type) || + "integer".equals(type) || + "boolean".equals(type) || + "number".equals(type); + } + + private JsonArray createOneOfTypes( + String originalType, + JsonValue pattern) + { + JsonArrayBuilder oneOfArrayBuilder = Json.createArrayBuilder(); + JsonObjectBuilder objectBuilder = Json.createObjectBuilder(); + objectBuilder.add("type", originalType); + if (pattern != null) + { + objectBuilder.add("pattern", pattern); + } + oneOfArrayBuilder.add(objectBuilder); + + oneOfArrayBuilder.add(Json.createObjectBuilder() + .add("$ref", "#/$defs/expression") + ); + + return oneOfArrayBuilder.build(); + } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java index f91d70bbb3..aa16788f7c 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java @@ -171,17 +171,13 @@ private EngineConfig parse( logger.accept(configText); - if (config.configResolveExpressions()) - { - configText = expressions.resolve(configText); - } - try { final Function namespaceReadURL = l -> readURL.apply(configURL, l); EngineConfigReader reader = new EngineConfigReader( new NamespaceConfigAdapterContext(namespaceReadURL), + expressions, schemaTypes, config.verboseSchema() ? logger : null); diff --git a/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/engine.schema.json b/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/engine.schema.json index cddf30068b..b8dfd697c1 100644 --- a/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/engine.schema.json +++ b/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/engine.schema.json @@ -105,6 +105,10 @@ "$defs": { + "expression":{ + "type": "string", + "pattern": "\\$\\{\\{\\s*([^\\s\\}]*)\\.([^\\s\\}]*)\\s*\\}\\}" + }, "vault": { "type": "object", From 876dd766a839dd4b195432be7ef02a9d77508b02 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 19 Jan 2024 10:00:37 -0800 Subject: [PATCH 3/6] Change oneOf to anyOf --- .../zilla/runtime/engine/config/EngineConfigReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java index a88d650443..cfae17304f 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java @@ -273,7 +273,7 @@ else if (key.equals("type") && { JsonValue pattern = jsonObject.get("pattern"); builder.add(key, value); - builder.add("oneOf", createOneOfTypes(value.toString().replaceAll("\"", ""), pattern)); + builder.add("anyOf", createOneOfTypes(value.toString().replaceAll("\"", ""), pattern)); } else if (!"pattern".equals(key)) { From 698607be0f7fe3d7d67570f5270ad0df46f42636 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 19 Jan 2024 11:30:59 -0800 Subject: [PATCH 4/6] Remove redundent expression config --- .../aklivity/zilla/runtime/engine/EngineConfiguration.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java index 7c9bc53e24..960600c503 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java @@ -67,7 +67,6 @@ public class EngineConfiguration extends Configuration public static final BooleanPropertyDef ENGINE_VERBOSE; public static final BooleanPropertyDef ENGINE_VERBOSE_SCHEMA; public static final IntPropertyDef ENGINE_WORKERS; - public static final BooleanPropertyDef ENGINE_CONFIG_RESOLVE_EXPRESSIONS; private static final ConfigurationDef ENGINE_CONFIG; @@ -105,7 +104,6 @@ public class EngineConfiguration extends Configuration ENGINE_VERBOSE = config.property("verbose", false); ENGINE_VERBOSE_SCHEMA = config.property("verbose.schema", false); ENGINE_WORKERS = config.property("workers", Runtime.getRuntime().availableProcessors()); - ENGINE_CONFIG_RESOLVE_EXPRESSIONS = config.property("config.resolve.expressions", true); ENGINE_CONFIG = config; } @@ -259,11 +257,6 @@ public int workers() return ENGINE_WORKERS.getAsInt(this); } - public boolean configResolveExpressions() - { - return ENGINE_CONFIG_RESOLVE_EXPRESSIONS.getAsBoolean(this); - } - public Function hostResolver() { return ENGINE_HOST_RESOLVER.get(this)::resolve; From 4f60382eb8cd0e074f3e15f55928766ba8055371 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 19 Jan 2024 16:01:20 -0800 Subject: [PATCH 5/6] Add test to support expression validation --- .../engine/config/EngineConfigReader.java | 2 +- .../runtime/engine/internal/EngineTest.java | 26 +++++++++++++ .../expression/TestExpressionResolverSpi.java | 12 +++++- .../EngineTest-configure-expression.yaml | 36 ++++++++++++++++++ .../schema/binding/test.schema.patch.json | 38 ++++++++++++++++++- 5 files changed, 111 insertions(+), 3 deletions(-) create mode 100644 runtime/engine/src/test/resources/io/aklivity/zilla/runtime/engine/internal/EngineTest-configure-expression.yaml diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java index cfae17304f..0c5fc9f4a0 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/EngineConfigReader.java @@ -205,7 +205,7 @@ private boolean validateAnnotatedSchema( } final JsonParser schemaParser = schemaProvider.createParserFactory(null) - .createParser(new StringReader(schemaObject.toString())); + .createParser(new StringReader(annotatedSchemaObject.toString())); final JsonValidationService service = JsonValidationService.newInstance(); ProblemHandler handler = service.createProblemPrinter(msg -> errors.add(new ConfigException(msg))); diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineTest.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineTest.java index c2e829a9db..6c9cc3b255 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineTest.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineTest.java @@ -96,6 +96,32 @@ public void shouldConfigure() } } + @Test + public void shouldConfigureWithExpression() + { + String resource = String.format("%s-%s.yaml", getClass().getSimpleName(), "configure-expression"); + URL configURL = getClass().getResource(resource); + assert configURL != null; + properties.put(ENGINE_CONFIG_URL.name(), configURL.toString()); + EngineConfiguration config = new EngineConfiguration(properties); + List errors = new LinkedList<>(); + try (Engine engine = Engine.builder() + .config(config) + .errorHandler(errors::add) + .build()) + { + engine.start(); + } + catch (Throwable ex) + { + errors.add(ex); + } + finally + { + assertThat(errors, empty()); + } + } + @Test public void shouldConfigureComposite() { diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/expression/TestExpressionResolverSpi.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/expression/TestExpressionResolverSpi.java index 37e1fdacfb..ea0811ea0b 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/expression/TestExpressionResolverSpi.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/expression/TestExpressionResolverSpi.java @@ -29,6 +29,16 @@ public String name() public String resolve( String var) { - return "PASSWORD".equals(var) ? "ACTUALPASSWORD" : null; + String result = null; + + if ("PASSWORD".equals(var)) + { + result = "ACTUALPASSWORD"; + } + else if ("PORT".equals(var)) + { + result = "1234"; + } + return result; } } diff --git a/runtime/engine/src/test/resources/io/aklivity/zilla/runtime/engine/internal/EngineTest-configure-expression.yaml b/runtime/engine/src/test/resources/io/aklivity/zilla/runtime/engine/internal/EngineTest-configure-expression.yaml new file mode 100644 index 0000000000..af18cc5ec4 --- /dev/null +++ b/runtime/engine/src/test/resources/io/aklivity/zilla/runtime/engine/internal/EngineTest-configure-expression.yaml @@ -0,0 +1,36 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: default +vaults: + secure: + type: test +guards: + authorized: + type: test +catalogs: + managed: + type: test +bindings: + test0: + type: test + kind: server + options: + port: ${{test.PORT}} + routes: + - exit: test1 + exit: test2 diff --git a/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/binding/test.schema.patch.json b/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/binding/test.schema.patch.json index 5bd3d195f5..f7b936d80c 100644 --- a/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/binding/test.schema.patch.json +++ b/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/binding/test.schema.patch.json @@ -31,7 +31,43 @@ { "enum": [ "server", "proxy" ] }, - "options": false + "options": + { + "properties": + { + "port": + { + "title": "Port", + "oneOf": + [ + { + "type": "integer" + }, + { + "type": "string", + "pattern": "^\\d+(-\\d+)?$" + }, + { + "type": "array", + "items": + { + "oneOf": + [ + { + "type": "integer" + }, + { + "type": "string", + "pattern": "^\\d+(-\\d+)?$" + } + ] + } + } + ] + } + }, + "additionalProperties": false + } }, "anyOf": [ From 79ea2ad023a91fe7989b127b7d5bc7d847f43ac4 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 19 Jan 2024 16:27:14 -0800 Subject: [PATCH 6/6] Add negative test --- .../runtime/engine/internal/EngineTest.java | 27 ++++++++++++++ .../expression/TestExpressionResolverSpi.java | 5 +++ ...gineTest-configure-expression-invalid.yaml | 36 +++++++++++++++++++ 3 files changed, 68 insertions(+) create mode 100644 runtime/engine/src/test/resources/io/aklivity/zilla/runtime/engine/internal/EngineTest-configure-expression-invalid.yaml diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineTest.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineTest.java index 6c9cc3b255..711dca56e2 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineTest.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertTrue; import java.net.URL; import java.util.LinkedList; @@ -122,6 +123,32 @@ public void shouldConfigureWithExpression() } } + @Test + public void shouldConfigureWithExpressionInvalid() + { + String resource = String.format("%s-%s.yaml", getClass().getSimpleName(), "configure-expression-invalid"); + URL configURL = getClass().getResource(resource); + assert configURL != null; + properties.put(ENGINE_CONFIG_URL.name(), configURL.toString()); + EngineConfiguration config = new EngineConfiguration(properties); + List errors = new LinkedList<>(); + try (Engine engine = Engine.builder() + .config(config) + .errorHandler(errors::add) + .build()) + { + engine.start(); + } + catch (Throwable ex) + { + errors.add(ex); + } + finally + { + assertTrue(!errors.isEmpty()); + } + } + @Test public void shouldConfigureComposite() { diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/expression/TestExpressionResolverSpi.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/expression/TestExpressionResolverSpi.java index ea0811ea0b..d9f36d6f31 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/expression/TestExpressionResolverSpi.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/expression/TestExpressionResolverSpi.java @@ -39,6 +39,11 @@ else if ("PORT".equals(var)) { result = "1234"; } + else if ("EXPRESSION".equals(var)) + { + result = "${{test.EXPRESSION}}"; + } + return result; } } diff --git a/runtime/engine/src/test/resources/io/aklivity/zilla/runtime/engine/internal/EngineTest-configure-expression-invalid.yaml b/runtime/engine/src/test/resources/io/aklivity/zilla/runtime/engine/internal/EngineTest-configure-expression-invalid.yaml new file mode 100644 index 0000000000..961f5a870c --- /dev/null +++ b/runtime/engine/src/test/resources/io/aklivity/zilla/runtime/engine/internal/EngineTest-configure-expression-invalid.yaml @@ -0,0 +1,36 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: default +vaults: + secure: + type: test +guards: + authorized: + type: test +catalogs: + managed: + type: test +bindings: + test0: + type: test + kind: server + options: + port: ${{test.EXPRESSION}} + routes: + - exit: test1 + exit: test2