-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support eager evaluation of all Kafka filters #212
Conversation
final KafkaMergedDataExFW mergedDataEx) | ||
{ | ||
return filters == null || filters == mergedDataEx.filters(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to call matchFilters
from match
above.
.header("name", "value") | ||
.build() | ||
.build() | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use further indentation to show nested .build()
objects.
...ain/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCacheCursorFactory.java
Show resolved
Hide resolved
@@ -962,6 +963,93 @@ public String toString() | |||
} | |||
} | |||
|
|||
private static final class EagerOr extends KafkaFilterCondition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, suggest creating a superclass Or
and subsclasses LazyOr
and EagerOr
for improved readability.
Only the .test(...)
method implementation will differ between LazyOr
and EagerOr
, so everything else can be implemented in the Or
superclass.
@@ -36,6 +36,7 @@ scope internal | |||
kafka::KafkaHeader[] trailers; | |||
uint32 paddingLen; | |||
octets[paddingLen] padding; | |||
int64 filters; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change needs to be reverted as KafkaCacheEntry
is shared across multiple readers, whereas filters
mask applies to each individual cache client stream.
@@ -440,6 +441,7 @@ public void shouldGenerateMergedDataExtension() | |||
.typeId(0x01) | |||
.merged() | |||
.timestamp(12345678L) | |||
.filters(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The example should probably show a non-zero filter mask, because filter mask zero means the message didn't match and therefore wouldn't be delivered.
@@ -219,6 +226,7 @@ scope kafka | |||
{ | |||
int32 deferred = 0; // INIT only (TODO: move to DATA frame) | |||
int64 timestamp = 0; // INIT only | |||
int64 filters = 0; // INIT only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest we default to -1
instead, as filters mask 0
means no match.
@@ -269,6 +278,7 @@ scope kafka | |||
int64 timestamp = 0; // INIT only | |||
int32 headersSizeMax = 4; // INIT only | |||
int64 producerId = -1; // INIT only | |||
int64 filters = 0; // INIT only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest we default to -1
instead, as filters mask 0
means no match.
KafkaCacheEntryFW cacheEntry) | ||
{ | ||
return cacheEntry != null; | ||
return cacheEntry != null ? 1L : 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be -1
to generate the default, non-zero value for None
filter?
@@ -876,11 +881,11 @@ public String toString() | |||
} | |||
} | |||
|
|||
private static final class Or extends KafkaFilterCondition | |||
private static class Or extends KafkaFilterCondition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's mark this abstract
too.
public long test(KafkaCacheEntryFW cacheEntry) | ||
{ | ||
return 0L; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove method, now that class is abstract.
|
||
@Override | ||
public long test( | ||
KafkaCacheEntryFW cacheEntry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra indentation.
if (result != 0L) | ||
{ | ||
accept |= i + 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rework this code so that it can simplify to accept |= condition.test(cacheEntry);
.
That requires the condition to return the relevant bitmask for the filter index it is on, not just 0
or 1
.
Therefore, when each filter condition is created for non-Or conditions, it needs to know the filter index, so it can compute 1 << index
as bitmask to return when .test(...)
is successful, instead of 1
.
} | ||
|
||
private static KafkaFilterCondition.Key initNullKeyInfo( | ||
long mask, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before adding the mask
, we used this as a shared instance for all null
key conditions, but now we need the mask
to be correct per client, so the shared part should shift to the nullKeyRO
and we now need to create the mask-specific Key
instance.
{ | ||
nextEntry = null; | ||
} | ||
|
||
filters = nextEntry != null ? condition.test(nextEntry) : 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please call condition.test(nextEntry)
just once.
...ain/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCacheCursorFactory.java
Show resolved
Hide resolved
@@ -71,7 +73,7 @@ public KafkaCacheCursorFactory( | |||
{ | |||
this.writeBuffer = writeBuffer; | |||
this.checksum = new CRC32C(); | |||
this.nullKeyInfo = initNullKeyInfo(checksum); | |||
this.nullKeyRO = initNullKeyRO(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were previously using this approach to avoid copying the null
key value for each client filter.
I don't see where we are using nullKeyRO
after the changes.
@@ -71,7 +72,6 @@ public KafkaCacheCursorFactory( | |||
{ | |||
this.writeBuffer = writeBuffer; | |||
this.checksum = new CRC32C(); | |||
this.nullKeyInfo = initNullKeyInfo(checksum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might have misinterpreted the feedback. 🙂
I was saying that we want to preserve the original optimization of not copying null
keys all the time, just the approach is slightly different now because we cannot pre-create the entire filter condition.
|
||
return value == null ? | ||
new KafkaFilterCondition.Key(mask, checksum, nullKeyRO) : new KafkaFilterCondition.Key(mask, checksum, key); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is closer, but we still make the copy of nullKeyRO
each time we construct a new KafkaFilterCondition.Key
.
Follow the constructor hierarchy up through super
to see the copy in Equals
.
Fix #209