Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sporadic github action build failure fix #522

Merged
merged 23 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ public void shouldRejectMessageValue() throws Exception
k3po.finish();
}

@Ignore("GitHub Actions")
@Test
@Configuration("cache.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public final class Engine implements Collector, AutoCloseable
private final URL rootConfigURL;
private final Collection<DispatchAgent> dispatchers;
private final boolean readonly;
private final EngineConfiguration config;
private Future<Void> watcherTaskRef;

Engine(
Expand All @@ -114,6 +115,7 @@ public final class Engine implements Collector, AutoCloseable
Collection<EngineAffinity> affinities,
boolean readonly)
{
this.config = config;
this.nextTaskId = new AtomicInteger();
this.factory = Executors.defaultThreadFactory();

Expand Down Expand Up @@ -248,6 +250,11 @@ public void start() throws Exception
@Override
public void close() throws Exception
{
if (config.drainOnClose())
{
drain();
}

final List<Throwable> errors = new ArrayList<>();

watcherTask.close();
Expand Down Expand Up @@ -282,6 +289,11 @@ public void close() throws Exception
}
}

private void drain()
{
dispatchers.forEach(d -> d.drain());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dispatchers.forEach(d -> d.drain());
dispatchers.forEach(DispatchAgent::drain);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also suggest we just inline this, since we don't need a separate Engine.drain() method.

}

// required for testing
public ContextImpl context()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,17 +713,6 @@ public int doWork()
@Override
public void onClose()
{
final long closeAt = System.nanoTime();
while (config.drainOnClose() &&
streamsBuffer.consumerPosition() < streamsBuffer.producerPosition())
{
ThreadHints.onSpinWait();

if (System.nanoTime() - closeAt >= Duration.ofSeconds(30).toNanos())
{
break;
}
}
configuration.detachAll();

poller.onClose();
Expand Down Expand Up @@ -769,6 +758,20 @@ public void onClose()
}
}

public void drain()
{
final long closeAt = System.nanoTime();
while (streamsBuffer.consumerPosition() < streamsBuffer.producerPosition())
{
ThreadHints.onSpinWait();

if (System.nanoTime() - closeAt >= Duration.ofSeconds(30).toNanos())
{
break;
}
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ else if (metricGroupId == originTypeId)
private void detachBinding(
BindingConfig config)
{
int bindingId = supplyLabelId.applyAsInt(config.name);
int bindingId = NamespacedId.localId(config.id);
BindingRegistry context = bindingsById.remove(bindingId);
if (context != null)
{
Expand All @@ -264,7 +264,7 @@ private void attachVault(
private void detachVault(
VaultConfig config)
{
int vaultId = supplyLabelId.applyAsInt(config.name);
int vaultId = NamespacedId.localId(config.id);
VaultRegistry context = vaultsById.remove(vaultId);
if (context != null)
{
Expand All @@ -287,7 +287,7 @@ private void attachGuard(
private void detachGuard(
GuardConfig config)
{
int guardId = supplyLabelId.applyAsInt(config.name);
int guardId = NamespacedId.localId(config.id);
GuardRegistry context = guardsById.remove(guardId);
if (context != null)
{
Expand All @@ -310,7 +310,7 @@ private void attachCatalog(
private void detachCatalog(
CatalogConfig config)
{
int catalogId = supplyLabelId.applyAsInt(config.name);
int catalogId = NamespacedId.localId(config.id);
CatalogRegistry context = catalogsById.remove(catalogId);
if (context != null)
{
Expand All @@ -330,7 +330,7 @@ private void attachMetric(
private void detachMetric(
MetricConfig config)
{
int metricId = supplyLabelId.applyAsInt(config.name);
int metricId = NamespacedId.localId(config.id);
metricsById.remove(metricId);
}

Expand All @@ -349,7 +349,7 @@ private void attachExporter(
private void detachExporter(
ExporterConfig config)
{
int exporterId = supplyLabelId.applyAsInt(config.name);
int exporterId = NamespacedId.localId(config.id);
ExporterRegistry registry = exportersById.remove(exporterId);
if (registry != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.rules.RuleChain.outerRule;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
Expand Down Expand Up @@ -118,7 +117,6 @@ public void shouldReceiveClientSentWriteClose() throws Exception
k3po.finish();
}

@Ignore("GitHub Actions")
@Test
@Configuration("server.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void shouldReconfigureWhenModifiedUsingSymlink() throws Exception
k3po.finish();
}

@Ignore("Github Actions")
@Test
@Configuration("zilla.reconfigure.modify.complex.chain.json")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ public void evaluate() throws Throwable
final List<Throwable> errors = new ArrayList<>();
final ErrorHandler errorHandler = ex ->
{
ex.printStackTrace();
errors.add(ex);
baseThread.interrupt();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.behavior;

import static io.aklivity.zilla.runtime.engine.internal.stream.BudgetId.ownerIndex;
import static io.aklivity.zilla.runtime.engine.test.internal.k3po.ext.behavior.ZillaTransmission.HALF_DUPLEX;

import java.nio.file.Path;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -214,7 +213,7 @@ public void doClose(
ZillaTarget target = supplyTarget(channel);
target.doClose(channel, handlerFuture);

if (!readClosed && channel.getConfig().getTransmission() == HALF_DUPLEX)
if (!readClosed)
{
final ChannelFuture abortFuture = Channels.future(channel);
source.doAbortInput(channel, abortFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ read zilla:begin.ext ${kafka:beginEx()
.build()
.build()}


read notify SENT_ABORT

read advised zilla:flush ${kafka:matchFlushEx()
.typeId(zilla:id("kafka"))
.fetch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ write zilla:begin.ext ${kafka:beginEx()
.build()}
write flush

write await SENT_ABORT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be between partition(0, 3, 3, 3) and partition(0, 4, 4, 4) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understood you correctly, it is between those two message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean the beginEx, referring to between the flushEx and the dataEx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test are failing if I put between the flushEx and the dataEx.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense - the barrier needs to go before the flush.

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.fetch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ read zilla:data.ext ${kafka:matchDataEx()
.build()}
read "Hello, world"

read notify SENT_ABORT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally we think of barriers as events that need to be notified because they happened, rather than naming something yet to happen in the future.

So we should rename this to reflect what has already happened here.

Suggest something like RECEIVED_SKIPPED_MESSAGE?

read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.fetch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ write zilla:data.ext ${kafka:dataEx()
write "Hello, world"
write flush

write await SENT_ABORT

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.fetch()
Expand Down