Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log validation failure of HTTP messages (stdout) #781

Merged
merged 5 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.aklivity.zilla.runtime.binding.http.internal;

import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_VERBOSE;

import io.aklivity.zilla.runtime.binding.http.internal.types.String16FW;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.EngineConfiguration;
Expand All @@ -35,6 +37,7 @@ public class HttpConfiguration extends Configuration
public static final IntPropertyDef HTTP_MAX_CONCURRENT_APPLICATION_HEADERS;
public static final PropertyDef<String> HTTP_SERVER_HEADER;
public static final PropertyDef<String> HTTP_USER_AGENT_HEADER;
public static final BooleanPropertyDef HTTP_VERBOSE;

private static final ConfigurationDef HTTP_CONFIG;

Expand All @@ -52,6 +55,7 @@ public class HttpConfiguration extends Configuration
HTTP_MAX_CONCURRENT_STREAMS_CLEANUP = config.property("max.concurrent.streams.cleanup", 1000);
HTTP_STREAMS_CLEANUP_DELAY = config.property("streams.cleanup.delay", 100);
HTTP_MAX_CONCURRENT_APPLICATION_HEADERS = config.property("max.concurrent.application.headers", 10000);
HTTP_VERBOSE = config.property("verbose", HttpConfiguration::verboseDefault);
HTTP_CONFIG = config;
}

Expand Down Expand Up @@ -122,4 +126,15 @@ public String16FW userAgentHeader()
{
return userAgentHeader;
}

public boolean verbose()
{
return HTTP_VERBOSE.get(this);
}

private static boolean verboseDefault(
Configuration config)
{
return ENGINE_VERBOSE.getAsBoolean(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ public final class HttpClientFactory implements HttpStreamFactory
private final Map<String8FW, String16FW> headersMap;
private final String16FW h2cSettingsPayload;
private final HttpConfiguration config;
private final EngineContext context;
private final boolean verbose;
private final Http2Settings initialSettings;
private final MutableDirectBuffer frameBuffer;
private final MutableDirectBuffer writeBuffer;
Expand Down Expand Up @@ -344,6 +346,7 @@ public HttpClientFactory(
HttpConfiguration config,
EngineContext context)
{
this.context = context;
this.config = config;
this.proxyTypeId = context.supplyTypeId("proxy");
this.writeBuffer = context.writeBuffer();
Expand Down Expand Up @@ -377,6 +380,7 @@ public HttpClientFactory(
this.decodeMax = bufferPool.slotCapacity();
this.encodeMax = bufferPool.slotCapacity();
this.supplyValidator = context::supplyValidator;
this.verbose = config.verbose();

final byte[] settingsPayload = new byte[12];
http2SettingsRW.wrap(frameBuffer, 0, frameBuffer.capacity())
Expand Down Expand Up @@ -2901,7 +2905,7 @@ private void onDecodeHttp11Headers(
}
else
{
exchange.cleanup(traceId, authorization);
exchange.onResponseInvalid(traceId, authorization);
decoder = decodeHttp11Ignore;
}
}
Expand Down Expand Up @@ -2935,7 +2939,7 @@ private int onDecodeHttp11Body(
}
else
{
exchange.doResponseAbort(traceId, authorization, EMPTY_OCTETS);
exchange.onResponseInvalid(traceId, authorization);
result = limit;
}
return result;
Expand Down Expand Up @@ -3376,7 +3380,7 @@ private int onDecodeHttp2Data(
}
else
{
exchange.cleanup(traceId, authorization);
exchange.onResponseInvalid(traceId, authorization);
progress += payloadLength;
}
}
Expand Down Expand Up @@ -3486,8 +3490,7 @@ else if (headersDecoder.httpError())
}
else
{
exchange.doResponseAbort(traceId, authorization, EMPTY_OCTETS);
exchange.doRequestReset(traceId, authorization);
exchange.onResponseInvalid(traceId, authorization);
doEncodeHttp2RstStream(traceId, streamId, Http2ErrorCode.CANCEL);
decoder = decodeHttp2IgnoreAll;
}
Expand Down Expand Up @@ -5107,6 +5110,19 @@ private boolean validateResponseContent(
return contentType == null ||
contentType.validate(buffer, index, length, ValueConsumer.NOP);
}

private void onResponseInvalid(
long traceId,
long authorization)
{
if (verbose)
{
System.out.printf("%s:%s %s: Skipping invalid response on method %s, path %s\n",
System.currentTimeMillis(), context.supplyNamespace(routedId),
context.supplyLocalName(routedId), requestType.method, requestType.path);
}
cleanup(traceId, authorization);
}
}

private final class HttpPromise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ read zilla:begin.ext ${http:matchBeginEx()
.header("retry-after", "0")
.build()}

write aborted
read closed
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ write zilla:begin.ext ${http:beginEx()
.header("retry-after", "0")
.build()}

read abort
write close