-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka GRPC consumer Group Support #598
Conversation
…artial data frame while computing crc32c value
.merged(m -> m.consumer(mc -> mc | ||
.progress(p -> p | ||
.partitionId(partitionId) | ||
.partitionOffset(partitionOffset + 1L)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please hoist this as a local variable called nextPartitionOffset
to better document the code.
@@ -22,6 +22,7 @@ write zilla:begin.ext ${kafka:beginEx() | |||
.merged() | |||
.capabilities("FETCH_ONLY") | |||
.topic("requests") | |||
.groupId("zilla-kafka-grpc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be configurable, I'm thinking as part of KafkaConfiguration
, agree?
@@ -1956,6 +1985,7 @@ private MessageConsumer newKafkaFetch( | |||
.typeId(kafkaTypeId) | |||
.merged(m -> m.capabilities(c -> c.set(FETCH_ONLY)) | |||
.topic(condition.topic()) | |||
.groupId("zilla-kafka-grpc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be configurable, I'm thinking as part of KafkaConfiguration
, agree?
private static GroupIdSupplier defaultGroupId( | ||
Configuration config) | ||
{ | ||
return () -> String.format("zilla-%s-%s", "", ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest zilla:%s-%s
instead, probably more readable.
I'm thinking we'll want to do the same for mqtt-kafka
and perhaps make it possible to prefix with mqtt:
instead of zilla:
if desired to make it easier to differentiate the consumer groups using standard kafka tools.
@@ -182,9 +182,12 @@ public void attach( | |||
newBinding.routes.forEach(r -> | |||
r.when.forEach(c -> | |||
{ | |||
final String groupId = String.format("zilla-%s-%s", supplyNamespace.apply(binding.id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaConfiguration
changes look good for supplyGroupId()
, but we want have it return the pattern and then substitute in the namespace and local name for the binding via String.format
here.
The default pattern returned by KafkaConfiguration.supplyGroupId()
would be "zilla:%s-%s"
.
Description
Kafka GRPC consumer Group Support
Fixes #597