Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka GRPC consumer Group Support #598

Merged
merged 41 commits into from
Dec 9, 2023
Merged

Kafka GRPC consumer Group Support #598

merged 41 commits into from
Dec 9, 2023

Conversation

akrambek
Copy link
Contributor

Description

Kafka GRPC consumer Group Support

Fixes #597

.merged(m -> m.consumer(mc -> mc
.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset + 1L))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please hoist this as a local variable called nextPartitionOffset to better document the code.

@@ -22,6 +22,7 @@ write zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-kafka-grpc")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be configurable, I'm thinking as part of KafkaConfiguration, agree?

@@ -1956,6 +1985,7 @@ private MessageConsumer newKafkaFetch(
.typeId(kafkaTypeId)
.merged(m -> m.capabilities(c -> c.set(FETCH_ONLY))
.topic(condition.topic())
.groupId("zilla-kafka-grpc")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be configurable, I'm thinking as part of KafkaConfiguration, agree?

private static GroupIdSupplier defaultGroupId(
Configuration config)
{
return () -> String.format("zilla-%s-%s", "", "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest zilla:%s-%s instead, probably more readable.
I'm thinking we'll want to do the same for mqtt-kafka and perhaps make it possible to prefix with mqtt: instead of zilla: if desired to make it easier to differentiate the consumer groups using standard kafka tools.

@@ -182,9 +182,12 @@ public void attach(
newBinding.routes.forEach(r ->
r.when.forEach(c ->
{
final String groupId = String.format("zilla-%s-%s", supplyNamespace.apply(binding.id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaConfiguration changes look good for supplyGroupId(), but we want have it return the pattern and then substitute in the namespace and local name for the binding via String.format here.

The default pattern returned by KafkaConfiguration.supplyGroupId() would be "zilla:%s-%s".

jfallows
jfallows previously approved these changes Dec 9, 2023
@jfallows jfallows merged commit 7e89117 into aklivity:develop Dec 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka GRPC consumer Group Support
2 participants