Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support HTTP prefer async with OpenAPI #899

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;

public class AsyncapiAddress
{
public String location;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ public class AsyncapiOperation
public Map<String, AsyncapiBinding> bindings;
public AsyncapiChannel channel;
public String action;
public AsyncapiReply reply;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;

public class AsyncapiReply
{
public AsyncapiAddress address;
public AsyncapiChannel channel;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiOperation;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiReply;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiChannelView;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaConditionConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig;
Expand All @@ -34,14 +36,20 @@
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfigBuilder;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfig;
import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfigBuilder;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.AsyncapiConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiAsyncapiOptionsConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiAsyncapiSpecConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiConfig;
import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.OpenapiAsyncapiBinding;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiHeader;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiOperation;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponse;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponseByContentType;
import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiSchema;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiPathView;
import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSchemaView;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
Expand All @@ -54,11 +62,17 @@

public final class OpenapiAsyncCompositeBindingAdapter implements CompositeBindingAdapterSpi
{
private static final String CORRELATION_ID = "\\{correlationId\\}";
private static final String PARAMETERS = "\\{(?!correlationId)(\\w+)\\}";
private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$");
private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}");
private static final Pattern CORRELATION_PATTERN = Pattern.compile(CORRELATION_ID);
private static final Pattern REPLY_TO_PATTERN = Pattern.compile("#(.*)");

private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher("");
private final Matcher parameters = PARAMETER_PATTERN.matcher("");
private final Matcher correlation = CORRELATION_PATTERN.matcher("");
private final Matcher replyTo = REPLY_TO_PATTERN.matcher("");

@Override
public String type()
Expand Down Expand Up @@ -103,83 +117,100 @@ private <C> BindingConfigBuilder<C> injectHttpKafkaRoutes(
{
for (OpenapiAsyncapiConditionConfig condition : route.when)
{
Openapi openapi = spec.openapi.stream()
Optional<OpenapiConfig> openapiConfig = spec.openapi.stream()
.filter(o -> o.apiLabel.equals(condition.apiId))
.findFirst()
.get().openapi;
Asyncapi asyncapi = spec.asyncapi.stream()
.findFirst();
Optional<AsyncapiConfig> asyncapiConfig = spec.asyncapi.stream()
.filter(o -> o.apiLabel.equals(route.with.apiId))
.findFirst()
.get().asyncapi;
.findFirst();

for (String item : openapi.paths.keySet())
if (openapiConfig.isPresent() && asyncapiConfig.isPresent())
{
OpenapiPathView path = OpenapiPathView.of(openapi.paths.get(item));
for (String method : path.methods().keySet())
{
final String operationId = condition.operationId != null ?
condition.operationId : path.methods().get(method).operationId;
final Openapi openapi = openapiConfig.get().openapi;
final Asyncapi asyncapi = asyncapiConfig.get().asyncapi;

final OpenapiOperation openapiOperation = path.methods().get(method);
computeRoutes(binding, qname, condition, openapi, asyncapi);
}

final AsyncapiOperation asyncapiOperation = asyncapi.operations.entrySet().stream()
.filter(f -> f.getKey().equals(operationId))
.map(v -> v.getValue())
.findFirst()
.get();
}
}

final AsyncapiChannelView channel = AsyncapiChannelView
.of(asyncapi.channels, asyncapiOperation.channel);
return binding;
}

List<String> paramNames = new ArrayList<>();
Matcher matcher = parameters.reset(item);
while (matcher.find())
{
paramNames.add(parameters.group(1));
}

binding
.route()
.exit(qname)
.when(HttpKafkaConditionConfig::builder)
.method(method)
.path(item)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, openapi, openapiOperation,
asyncapiOperation.action, channel.address(), paramNames))
.build();
}
}
private <C> void computeRoutes(
BindingConfigBuilder<C> binding,
String qname,
OpenapiAsyncapiConditionConfig condition,
Openapi openapi,
Asyncapi asyncapi)
{
for (String item : openapi.paths.keySet())
{
OpenapiPathView path = OpenapiPathView.of(openapi.paths.get(item));
for (String method : path.methods().keySet())
{
final String operationId = condition.operationId != null ?
condition.operationId : path.methods().get(method).operationId;

final OpenapiOperation openapiOperation = path.methods().get(method);
final Optional<AsyncapiOperation> asyncapiOperation = asyncapi.operations.entrySet().stream()
.filter(f -> f.getKey().equals(operationId))
.map(Map.Entry::getValue)
.findFirst();
final List<String> paramNames = findParams(item);

asyncapiOperation.ifPresent(operation -> binding
.route()
.exit(qname)
.when(HttpKafkaConditionConfig::builder)
.method(method)
.path(item)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, openapi, asyncapi, openapiOperation,
operation, paramNames))
.build());
}
}
}

return binding;
private List<String> findParams(
String item)
{
List<String> paramNames = new ArrayList<>();
Matcher matcher = parameters.reset(item);
while (matcher.find())
{
paramNames.add(parameters.group(1));
}
return paramNames;
}

private <C> RouteConfigBuilder<C> injectHttpKafkaRouteWith(
RouteConfigBuilder<C> route,
Openapi openapi,
OpenapiOperation operation,
String action,
String address,
Asyncapi asyncapi,
OpenapiOperation openapiOperation,
AsyncapiOperation asyncapiOperation,
List<String> paramNames)
{
final HttpKafkaWithConfigBuilder<HttpKafkaWithConfig> newWith = HttpKafkaWithConfig.builder();
final AsyncapiChannelView channel = AsyncapiChannelView
.of(asyncapi.channels, asyncapiOperation.channel);
final String topic = channel.address();

switch (action)
switch (asyncapiOperation.action)
{
case "receive":
newWith.fetch(HttpKafkaWithFetchConfig.builder()
.topic(address)
.inject(with -> this.injectHttpKafkaRouteFetchWith(with, openapi, operation, paramNames))
.topic(topic)
.inject(with -> injectHttpKafkaRouteFetchWith(with, openapi, openapiOperation, paramNames))
.build());
break;
case "send":
String key = !paramNames.isEmpty() ? String.format("${params.%s}", paramNames.get(0)) : "${idempotencyKey}";
newWith.produce(HttpKafkaWithProduceConfig.builder()
.topic(address)
.acks("in_sync_replicas")
.key(key)
.topic(topic)
.inject(w -> injectHttpKafkaRouteProduceWith(w, openapiOperation, asyncapiOperation, paramNames))
.build());
break;
}
Expand Down Expand Up @@ -214,13 +245,77 @@ private <C> HttpKafkaWithFetchConfigBuilder<C> injectHttpKafkaRouteFetchWith(
if (!paramNames.isEmpty())
{
fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder()
.key(String.format("${params.%s}", paramNames.get(0)))
.key(String.format("${params.%s}", paramNames.get(paramNames.size() - 1)))
.build()));
}

return fetch;
}

private <C> HttpKafkaWithProduceConfigBuilder<C> injectHttpKafkaRouteProduceWith(
HttpKafkaWithProduceConfigBuilder<C> produce,
OpenapiOperation openapiOperation,
AsyncapiOperation asyncapiOperation,
List<String> paramNames)
{
final String key = !paramNames.isEmpty() ? String.format("${params.%s}",
paramNames.get(paramNames.size() - 1)) : "${idempotencyKey}";

produce.acks("in_sync_replicas").key(key);

for (Map.Entry<String, OpenapiResponseByContentType> response : openapiOperation.responses.entrySet())
{
if ("202".equals(response.getKey()))
{
OpenapiResponseByContentType content = response.getValue();
boolean async = content.headers.entrySet().stream()
.anyMatch(e -> hasCorrelationId(e.getValue()));

if (async)
{
content.headers.forEach((k, v) ->
{
String location = v.schema.format;
location = location.replaceAll(CORRELATION_ID, "\\${correlationId}");
location = location.replaceAll(PARAMETERS, "\\${params.$1}");
produce.async(HttpKafkaWithProduceAsyncHeaderConfig.builder()
.name(k)
.value(location)
.build());
});
}
}
}
AsyncapiReply reply = asyncapiOperation.reply;
if (reply != null)
{
final String location = reply.address.location;
Matcher matcher = replyTo.reset(location);

if (matcher.find())
{
produce.replyTo(matcher.group(1));
}
}

produce.build();

return produce;
}

private boolean hasCorrelationId(
OpenapiHeader header)
{
boolean hasCorrelationId = false;
OpenapiSchema schema = header.schema;
if (schema != null &&
schema.format != null)
{
hasCorrelationId = correlation.reset(schema.format).find();
}
return hasCorrelationId;
}

protected NamespaceConfigBuilder<BindingConfigBuilder<BindingConfig>> injectNamespaceMetric(
NamespaceConfigBuilder<BindingConfigBuilder<BindingConfig>> namespace,
boolean hasMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class OpenapiSchema
{
public String type;
public String format;
public OpenapiSchema items;
public Map<String, OpenapiItem> properties;
public List<String> required;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.http.kafka.config;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

Expand Down Expand Up @@ -72,6 +73,19 @@ public HttpKafkaWithProduceConfigBuilder<T> replyTo(
return this;
}

public HttpKafkaWithProduceConfigBuilder<T> async(
HttpKafkaWithProduceAsyncHeaderConfig header)
{
if (this.async == null)
{
this.async = new ArrayList<>();
}

this.async.add(header);

return this;
}

public HttpKafkaWithProduceConfigBuilder<T> async(
List<HttpKafkaWithProduceAsyncHeaderConfig> async)
{
Expand Down