-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer group message acknowledgement support #538
Consumer group message acknowledgement support #538
Conversation
…artial data frame while computing crc32c value
KafkaHeader[] headers; // INIT + FIN (produce), INIT only (fetch) | ||
} | ||
|
||
struct KafkaMergedConsumerDataExKafkaMergedConsumerDataEx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
union KafkaMergedFlushEx switch (uint8) | ||
{ | ||
case 252: kafka::stream::KafkaMergedConsumerFlushEx consumer; | ||
case 8: kafka::stream::KafkaMergedOffsetCommitFlushEx offsetCommit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should stay as KafkaMergedConsumerFlushEx for consistency with KafkaMergedConsumerDataEx, right?
struct KafkaConsumerDataEx | ||
union KafkaConsumerDataEx switch (uint8) | ||
{ | ||
case 253: kafka::stream::KafkaMergedConsumerDataEx group; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged should not be on Consumer, right?
} | ||
|
||
struct KafkaOffsetCommitDataEx | ||
{ | ||
string16 topic; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to have one offset commit stream per group instead of one per topic?
{ | ||
int32 deferred = 0; // INIT only (TODO: move to DATA frame) | ||
int64 timestamp = 0; // INIT only | ||
int64 filters = -1; // INIT only | ||
KafkaOffset partition; // INIT only | ||
KafkaOffset[] progress; // INIT only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is progress
used by produce
?
} | ||
|
||
struct KafkaOffsetCommitDataEx | ||
{ | ||
int32 partitionId; | ||
int64 partitionOffset; | ||
KafkaOffset progress; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
progress
or partition
? (actual question)
struct KafkaConsumerOffsetCommitDataEx | ||
{ | ||
KafkaOffset progress; | ||
int32 committedLeaderEpoch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps just leaderEpoch
as we are in the context of Commit
already here?
int32 partitionId; | ||
int64 partitionOffset; | ||
KafkaOffset progress; | ||
int32 committedLeaderEpoch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps just leaderEpoch
as we are in the context of Commit
already here?
@@ -208,6 +208,7 @@ scope kafka | |||
{ | |||
case 253: kafka::stream::KafkaGroupFlushEx group; | |||
case 255: kafka::stream::KafkaMergedFlushEx merged; | |||
case 8: kafka::stream::KafkaOffsetCommitFlushEx offsetCommit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps offset
is sufficient here instead of offsetCommit
?
union KafkaConsumerDataEx switch (uint8) | ||
{ | ||
case 253: kafka::stream::KafkaConsumerGroupDataEx group; | ||
case 8: kafka::stream::KafkaConsumerOffsetCommitDataEx offsetCommit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps offset
is sufficient here instead of offsetCommit
?
read zilla:data.ext ${kafka:dataEx() | ||
.typeId(zilla:id("kafka")) | ||
.consumer() | ||
.offsetCommit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think offset
would read better here, as in
consumer()
.offset()
...
write zilla:data.empty | ||
write flush | ||
|
||
read option zilla:ack 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern here is that reserved
is 0
when sending the empty DATA
frame, so WINDOW
ack does not need to elevate the initialAck
, making it difficult to tell the difference before or after ack.
Do you think it's an issue?
.typeId(zilla:id("kafka")) | ||
.offsetCommit() | ||
.progress(0, 2) | ||
.committedLeaderEpoch(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.leaderEpoch(0)
reads better here.
} | ||
|
||
struct KafkaOffsetFetchTopicOffsets | ||
{ | ||
string16 topic; | ||
KafkaOffset[] offsets; | ||
KafkaTopicPartitionOffset[] partitions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since KafkaOffsetFetchBeginEx
now includes topic
, then KafkaOffsetFetchDataEx
is already topic specific and doesn't need topic again, right?
I think that means we should use KafkaTopicPartition[]
partitions on KafkaOffsetFetchDataEx
and remove KafkaOffsetFetchTopicOffsets
from kafka.idl
, agree?
KafkaDelta delta; // INIT + FIN | ||
KafkaHeader[] headers; // INIT + FIN (produce), INIT only (fetch) | ||
KafkaHeader[] headers; // INIT only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment before might have been incorrect for fetch, since headers
are at the end of a kafka
message in the wire protocol, so merged directly over client fetch would send headers
with FIN
, whereas cache always sends headers
with INIT
, so this comment should probably be INIT + FIN
.
format("[merged] (%d) %d %s %d %d %d", | ||
merged.deferred(), merged.timestamp(), asString(key.value()), | ||
partition.partitionId(), partition.partitionOffset(), partition.latestOffset())); | ||
format("[merged] (%d) %d %s %d %d %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps [merged] [fetch] ...
?
final KafkaOffsetFW partition = produce.partition(); | ||
|
||
out.printf(verboseFormat, index, offset, timestamp, | ||
format("[merged] (%d) %d %s %d %d %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps [merged] [produce] ...
?
@@ -641,8 +641,8 @@ protected void onKafkaData( | |||
} | |||
else if (kafkaDataEx != null) | |||
{ | |||
final KafkaMergedDataExFW kafkaMergedDataEx = kafkaDataEx.merged(); | |||
final int contentLength = payload.sizeof() + kafkaMergedDataEx.deferred(); | |||
final KafkaMergedFetchDataExFW fetch = kafkaDataEx.merged().fetch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename fetch
to kafkaMergedFetchDataEx
.
kafkaMergedDataEx.fetch().progress() : null; | ||
key = kafkaMergedDataEx != null ? kafkaMergedDataEx.fetch().key().value() : null; | ||
final Array32FW<KafkaHeaderFW> headers = kafkaMergedDataEx != null ? | ||
kafkaMergedDataEx.fetch().headers() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define kafkaMergedFetchDataEx
to avoid calling kafkaMergedDataEx.fetch()
repeatedly.
@@ -176,7 +176,7 @@ read 20 # size | |||
-1s # authentication bytes | |||
0L # session lifetime | |||
|
|||
write 119 # size | |||
write 82 # size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment has vertical misalignment
(also for other similar scripts with ${instanceId}
replaced by "zilla"
)
@Specification({ | ||
"${app}/topic.offset.info/client", | ||
"${app}/topic.offset.info/server"}) | ||
public void shouldFetchPartitionOffsetInfo() throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test scenario name does not match script scenario name.
@Specification({ | ||
"${net}/topic.offset.info/client", | ||
"${net}/topic.offset.info/server"}) | ||
public void shouldFetchPartitionOffsetInfo() throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check test scenario name.
import org.kaazing.k3po.junit.annotation.Specification; | ||
import org.kaazing.k3po.junit.rules.K3poRule; | ||
|
||
public class OffsetFetchSaslIT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check test scenario names.
Also typo Sals should be Sasl, and Scarm should be Scram
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format("%s: %s", asString(h.name()), asString(h.value())))); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
traceId, authorization, initialBud, reserved, extension); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final KafkaBeginExFW groupBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null; | ||
final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? groupBeginEx.group() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final KafkaBeginExFW groupBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null; | |
final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? groupBeginEx.group() : null; | |
final KafkaBeginExFW kafkaBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null; | |
final KafkaGroupBeginExFW kafkaGroupBeginEx = groupBeginEx != null ? kafkaBeginEx.group() : null; |
client.errorCode = errorCode; | ||
client.decoder = decodeReject; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest switch
instead of if-else-if-else
.
Description
This feature lets other kafka mapping ability to start fetching from the last observed message offset
Fixes #588