Skip to content

Commit

Permalink
Support HTTP prefer async with OpenAPI (#899)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek authored Apr 8, 2024
1 parent 35d37e0 commit 134687e
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;

public class AsyncapiAddress
{
public String location;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ public class AsyncapiOperation
public Map<String, AsyncapiBinding> bindings;
public AsyncapiChannel channel;
public String action;
public AsyncapiReply reply;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;

public class AsyncapiReply
{
public AsyncapiAddress address;
public AsyncapiChannel channel;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiReply;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaConditionConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig;
Expand All @@ -34,14 +36,20 @@
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfigBuilder;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfigBuilder;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.AsyncapiConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiAsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiAsyncapiSpecConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.OpenapiAsyncapiBinding;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiHeader;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiOperation;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponse;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponseByContentType;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiSchema;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiPathView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSchemaView;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
Expand All @@ -54,11 +62,17 @@

public final class OpenapiAsyncCompositeBindingAdapter implements CompositeBindingAdapterSpi
{
private static final String CORRELATION_ID = "\\{correlationId\\}";
private static final String PARAMETERS = "\\{(?!correlationId)(\\w+)\\}";
private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$");
private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}");
private static final Pattern CORRELATION_PATTERN = Pattern.compile(CORRELATION_ID);
private static final Pattern REPLY_TO_PATTERN = Pattern.compile("#(.*)");

private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher("");
private final Matcher parameters = PARAMETER_PATTERN.matcher("");
private final Matcher correlation = CORRELATION_PATTERN.matcher("");
private final Matcher replyTo = REPLY_TO_PATTERN.matcher("");

@Override
public String type()
Expand Down Expand Up @@ -103,83 +117,100 @@ private <C> BindingConfigBuilder<C> injectHttpKafkaRoutes(
{
for (OpenapiAsyncapiConditionConfig condition : route.when)
{
Openapi openapi = spec.openapi.stream()
Optional<OpenapiConfig> openapiConfig = spec.openapi.stream()
.filter(o -> o.apiLabel.equals(condition.apiId))
.findFirst()
.get().openapi;
Asyncapi asyncapi = spec.asyncapi.stream()
.findFirst();
Optional<AsyncapiConfig> asyncapiConfig = spec.asyncapi.stream()
.filter(o -> o.apiLabel.equals(route.with.apiId))
.findFirst()
.get().asyncapi;
.findFirst();

for (String item : openapi.paths.keySet())
if (openapiConfig.isPresent() && asyncapiConfig.isPresent())
{
OpenapiPathView path = OpenapiPathView.of(openapi.paths.get(item));
for (String method : path.methods().keySet())
{
final String operationId = condition.operationId != null ?
condition.operationId : path.methods().get(method).operationId;
final Openapi openapi = openapiConfig.get().openapi;
final Asyncapi asyncapi = asyncapiConfig.get().asyncapi;

final OpenapiOperation openapiOperation = path.methods().get(method);
computeRoutes(binding, qname, condition, openapi, asyncapi);
}

final AsyncapiOperation asyncapiOperation = asyncapi.operations.entrySet().stream()
.filter(f -> f.getKey().equals(operationId))
.map(v -> v.getValue())
.findFirst()
.get();
}
}

final AsyncapiChannelView channel = AsyncapiChannelView
.of(asyncapi.channels, asyncapiOperation.channel);
return binding;
}

List<String> paramNames = new ArrayList<>();
Matcher matcher = parameters.reset(item);
while (matcher.find())
{
paramNames.add(parameters.group(1));
}

binding
.route()
.exit(qname)
.when(HttpKafkaConditionConfig::builder)
.method(method)
.path(item)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, openapi, openapiOperation,
asyncapiOperation.action, channel.address(), paramNames))
.build();
}
}
private <C> void computeRoutes(
BindingConfigBuilder<C> binding,
String qname,
OpenapiAsyncapiConditionConfig condition,
Openapi openapi,
Asyncapi asyncapi)
{
for (String item : openapi.paths.keySet())
{
OpenapiPathView path = OpenapiPathView.of(openapi.paths.get(item));
for (String method : path.methods().keySet())
{
final String operationId = condition.operationId != null ?
condition.operationId : path.methods().get(method).operationId;

final OpenapiOperation openapiOperation = path.methods().get(method);
final Optional<AsyncapiOperation> asyncapiOperation = asyncapi.operations.entrySet().stream()
.filter(f -> f.getKey().equals(operationId))
.map(Map.Entry::getValue)
.findFirst();
final List<String> paramNames = findParams(item);

asyncapiOperation.ifPresent(operation -> binding
.route()
.exit(qname)
.when(HttpKafkaConditionConfig::builder)
.method(method)
.path(item)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, openapi, asyncapi, openapiOperation,
operation, paramNames))
.build());
}
}
}

return binding;
private List<String> findParams(
String item)
{
List<String> paramNames = new ArrayList<>();
Matcher matcher = parameters.reset(item);
while (matcher.find())
{
paramNames.add(parameters.group(1));
}
return paramNames;
}

private <C> RouteConfigBuilder<C> injectHttpKafkaRouteWith(
RouteConfigBuilder<C> route,
Openapi openapi,
OpenapiOperation operation,
String action,
String address,
Asyncapi asyncapi,
OpenapiOperation openapiOperation,
AsyncapiOperation asyncapiOperation,
List<String> paramNames)
{
final HttpKafkaWithConfigBuilder<HttpKafkaWithConfig> newWith = HttpKafkaWithConfig.builder();
final AsyncapiChannelView channel = AsyncapiChannelView
.of(asyncapi.channels, asyncapiOperation.channel);
final String topic = channel.address();

switch (action)
switch (asyncapiOperation.action)
{
case "receive":
newWith.fetch(HttpKafkaWithFetchConfig.builder()
.topic(address)
.inject(with -> this.injectHttpKafkaRouteFetchWith(with, openapi, operation, paramNames))
.topic(topic)
.inject(with -> injectHttpKafkaRouteFetchWith(with, openapi, openapiOperation, paramNames))
.build());
break;
case "send":
String key = !paramNames.isEmpty() ? String.format("${params.%s}", paramNames.get(0)) : "${idempotencyKey}";
newWith.produce(HttpKafkaWithProduceConfig.builder()
.topic(address)
.acks("in_sync_replicas")
.key(key)
.topic(topic)
.inject(w -> injectHttpKafkaRouteProduceWith(w, openapiOperation, asyncapiOperation, paramNames))
.build());
break;
}
Expand Down Expand Up @@ -214,13 +245,77 @@ private <C> HttpKafkaWithFetchConfigBuilder<C> injectHttpKafkaRouteFetchWith(
if (!paramNames.isEmpty())
{
fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder()
.key(String.format("${params.%s}", paramNames.get(0)))
.key(String.format("${params.%s}", paramNames.get(paramNames.size() - 1)))
.build()));
}

return fetch;
}

private <C> HttpKafkaWithProduceConfigBuilder<C> injectHttpKafkaRouteProduceWith(
HttpKafkaWithProduceConfigBuilder<C> produce,
OpenapiOperation openapiOperation,
AsyncapiOperation asyncapiOperation,
List<String> paramNames)
{
final String key = !paramNames.isEmpty() ? String.format("${params.%s}",
paramNames.get(paramNames.size() - 1)) : "${idempotencyKey}";

produce.acks("in_sync_replicas").key(key);

for (Map.Entry<String, OpenapiResponseByContentType> response : openapiOperation.responses.entrySet())
{
if ("202".equals(response.getKey()))
{
OpenapiResponseByContentType content = response.getValue();
boolean async = content.headers.entrySet().stream()
.anyMatch(e -> hasCorrelationId(e.getValue()));

if (async)
{
content.headers.forEach((k, v) ->
{
String location = v.schema.format;
location = location.replaceAll(CORRELATION_ID, "\\${correlationId}");
location = location.replaceAll(PARAMETERS, "\\${params.$1}");
produce.async(HttpKafkaWithProduceAsyncHeaderConfig.builder()
.name(k)
.value(location)
.build());
});
}
}
}
AsyncapiReply reply = asyncapiOperation.reply;
if (reply != null)
{
final String location = reply.address.location;
Matcher matcher = replyTo.reset(location);

if (matcher.find())
{
produce.replyTo(matcher.group(1));
}
}

produce.build();

return produce;
}

private boolean hasCorrelationId(
OpenapiHeader header)
{
boolean hasCorrelationId = false;
OpenapiSchema schema = header.schema;
if (schema != null &&
schema.format != null)
{
hasCorrelationId = correlation.reset(schema.format).find();
}
return hasCorrelationId;
}

protected NamespaceConfigBuilder<BindingConfigBuilder<BindingConfig>> injectNamespaceMetric(
NamespaceConfigBuilder<BindingConfigBuilder<BindingConfig>> namespace,
boolean hasMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class OpenapiSchema
{
public String type;
public String format;
public OpenapiSchema items;
public Map<String, OpenapiItem> properties;
public List<String> required;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.http.kafka.config;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

Expand Down Expand Up @@ -72,6 +73,19 @@ public HttpKafkaWithProduceConfigBuilder<T> replyTo(
return this;
}

public HttpKafkaWithProduceConfigBuilder<T> async(
HttpKafkaWithProduceAsyncHeaderConfig header)
{
if (this.async == null)
{
this.async = new ArrayList<>();
}

this.async.add(header);

return this;
}

public HttpKafkaWithProduceConfigBuilder<T> async(
List<HttpKafkaWithProduceAsyncHeaderConfig> async)
{
Expand Down

0 comments on commit 134687e

Please sign in to comment.