Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer group message acknowledgement support #538

Merged
merged 62 commits into from
Nov 29, 2023
Merged

Consumer group message acknowledgement support #538

merged 62 commits into from
Nov 29, 2023

Conversation

akrambek
Copy link
Contributor

@akrambek akrambek commented Oct 27, 2023

Description

This feature lets other kafka mapping ability to start fetching from the last observed message offset

Fixes #588

KafkaHeader[] headers; // INIT + FIN (produce), INIT only (fetch)
}

struct KafkaMergedConsumerDataExKafkaMergedConsumerDataEx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

union KafkaMergedFlushEx switch (uint8)
{
case 252: kafka::stream::KafkaMergedConsumerFlushEx consumer;
case 8: kafka::stream::KafkaMergedOffsetCommitFlushEx offsetCommit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should stay as KafkaMergedConsumerFlushEx for consistency with KafkaMergedConsumerDataEx, right?

struct KafkaConsumerDataEx
union KafkaConsumerDataEx switch (uint8)
{
case 253: kafka::stream::KafkaMergedConsumerDataEx group;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged should not be on Consumer, right?

}

struct KafkaOffsetCommitDataEx
{
string16 topic;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to have one offset commit stream per group instead of one per topic?

jfallows
jfallows previously approved these changes Nov 6, 2023
{
int32 deferred = 0; // INIT only (TODO: move to DATA frame)
int64 timestamp = 0; // INIT only
int64 filters = -1; // INIT only
KafkaOffset partition; // INIT only
KafkaOffset[] progress; // INIT only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is progress used by produce?

}

struct KafkaOffsetCommitDataEx
{
int32 partitionId;
int64 partitionOffset;
KafkaOffset progress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

progress or partition? (actual question)

struct KafkaConsumerOffsetCommitDataEx
{
KafkaOffset progress;
int32 committedLeaderEpoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps just leaderEpoch as we are in the context of Commit already here?

int32 partitionId;
int64 partitionOffset;
KafkaOffset progress;
int32 committedLeaderEpoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps just leaderEpoch as we are in the context of Commit already here?

@@ -208,6 +208,7 @@ scope kafka
{
case 253: kafka::stream::KafkaGroupFlushEx group;
case 255: kafka::stream::KafkaMergedFlushEx merged;
case 8: kafka::stream::KafkaOffsetCommitFlushEx offsetCommit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps offset is sufficient here instead of offsetCommit?

union KafkaConsumerDataEx switch (uint8)
{
case 253: kafka::stream::KafkaConsumerGroupDataEx group;
case 8: kafka::stream::KafkaConsumerOffsetCommitDataEx offsetCommit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps offset is sufficient here instead of offsetCommit?

read zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.consumer()
.offsetCommit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think offset would read better here, as in

consumer()
  .offset()
  ...

write zilla:data.empty
write flush

read option zilla:ack 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern here is that reserved is 0 when sending the empty DATA frame, so WINDOW ack does not need to elevate the initialAck, making it difficult to tell the difference before or after ack.

Do you think it's an issue?

.typeId(zilla:id("kafka"))
.offsetCommit()
.progress(0, 2)
.committedLeaderEpoch(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.leaderEpoch(0) reads better here.

@akrambek akrambek changed the title Kafka Idl changes for consumer group offset commit Consumer group message acknowledgement Nov 21, 2023
@akrambek akrambek changed the title Consumer group message acknowledgement Consumer group message acknowledgement support Nov 23, 2023
@akrambek akrambek marked this pull request as ready for review November 23, 2023 06:19
}

struct KafkaOffsetFetchTopicOffsets
{
string16 topic;
KafkaOffset[] offsets;
KafkaTopicPartitionOffset[] partitions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since KafkaOffsetFetchBeginEx now includes topic, then KafkaOffsetFetchDataEx is already topic specific and doesn't need topic again, right?

I think that means we should use KafkaTopicPartition[] partitions on KafkaOffsetFetchDataEx and remove KafkaOffsetFetchTopicOffsets from kafka.idl, agree?

KafkaDelta delta; // INIT + FIN
KafkaHeader[] headers; // INIT + FIN (produce), INIT only (fetch)
KafkaHeader[] headers; // INIT only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment before might have been incorrect for fetch, since headers are at the end of a kafka message in the wire protocol, so merged directly over client fetch would send headers with FIN, whereas cache always sends headers with INIT, so this comment should probably be INIT + FIN.

format("[merged] (%d) %d %s %d %d %d",
merged.deferred(), merged.timestamp(), asString(key.value()),
partition.partitionId(), partition.partitionOffset(), partition.latestOffset()));
format("[merged] (%d) %d %s %d %d %d",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps [merged] [fetch] ...?

final KafkaOffsetFW partition = produce.partition();

out.printf(verboseFormat, index, offset, timestamp,
format("[merged] (%d) %d %s %d %d %d",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps [merged] [produce] ...?

@@ -641,8 +641,8 @@ protected void onKafkaData(
}
else if (kafkaDataEx != null)
{
final KafkaMergedDataExFW kafkaMergedDataEx = kafkaDataEx.merged();
final int contentLength = payload.sizeof() + kafkaMergedDataEx.deferred();
final KafkaMergedFetchDataExFW fetch = kafkaDataEx.merged().fetch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename fetch to kafkaMergedFetchDataEx.

kafkaMergedDataEx.fetch().progress() : null;
key = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().key().value() : null;
final Array32FW<KafkaHeaderFW> headers = kafkaMergedDataEx != null ?
kafkaMergedDataEx.fetch().headers() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define kafkaMergedFetchDataEx to avoid calling kafkaMergedDataEx.fetch() repeatedly.

@@ -176,7 +176,7 @@ read 20 # size
-1s # authentication bytes
0L # session lifetime

write 119 # size
write 82 # size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment has vertical misalignment
(also for other similar scripts with ${instanceId} replaced by "zilla")

@Specification({
"${app}/topic.offset.info/client",
"${app}/topic.offset.info/server"})
public void shouldFetchPartitionOffsetInfo() throws Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test scenario name does not match script scenario name.

@Specification({
"${net}/topic.offset.info/client",
"${net}/topic.offset.info/server"})
public void shouldFetchPartitionOffsetInfo() throws Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check test scenario name.

import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;

public class OffsetFetchSaslIT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check test scenario names.
Also typo Sals should be Sasl, and Scarm should be Scram

Comment on lines 1186 to 1187


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

format("%s: %s", asString(h.name()), asString(h.value()))));
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

traceId, authorization, initialBud, reserved, extension);
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Comment on lines 842 to 843
final KafkaBeginExFW groupBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null;
final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? groupBeginEx.group() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final KafkaBeginExFW groupBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null;
final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? groupBeginEx.group() : null;
final KafkaBeginExFW kafkaBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null;
final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? kafkaBeginEx.group() : null;

client.errorCode = errorCode;
client.decoder = decodeReject;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest switch instead of if-else-if-else.

@jfallows jfallows merged commit c20c047 into aklivity:develop Nov 29, 2023
bmaidics pushed a commit to bmaidics/zilla that referenced this pull request Nov 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Consumer group message acknowledgement support
2 participants