Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support eager evaluation of all kafka filters and indicate which filters matched #209

Closed
jfallows opened this issue Apr 18, 2023 · 0 comments · Fixed by #212
Closed

Support eager evaluation of all kafka filters and indicate which filters matched #209

jfallows opened this issue Apr 18, 2023 · 0 comments · Fixed by #212
Assignees
Labels
enhancement New feature or request

Comments

@jfallows
Copy link
Contributor

Support eager evaluation of all kafka filters on KafkaMergedBeginEx or KafkaFetchBeginEx.

We currently evaluate each KafkaFilter lazily, stopping on first success, so we do not know which filter succeeded and which filters failed or were never executed after first successful filter.

In some cases, we need to understand the complete set of matching filters that could have contributed to deliver a message.

Therefore, we can add the concept of evaluation to the filters, indicating LAZY (backwards compatible) or EAGER behavior.

Regardless of execution strategy, we can include a filters mask in KafkaMergedDataEx or KafkaFetchDataEx, indicating which filters were evaluated to true when delivering the matching message.

Note: using int64 to represent the filters mask limits the maximum number of filters for each stream to 64, which is acceptable.

    enum KafkaEvaluation (uint8)
    {
        LAZY(0),
        EAGER(1)
    }
        struct KafkaMergedBeginEx
        {
            KafkaCapabilities capabilities = PRODUCE_AND_FETCH;
            string16 topic;
            KafkaOffset[] partitions;
            KafkaFilter[] filters; // ORed
            KafkaEvaluation evaluation = LAZY;
            KafkaIsolation isolation = READ_COMMITTED;
            KafkaDeltaType deltaType = NONE;
            KafkaAckMode ackMode = IN_SYNC_REPLICAS;
        }
        struct KafkaFetchBeginEx
        {
            string16 topic;
            KafkaOffset partition;
            KafkaFilter[] filters; // ORed
            KafkaEvaluation evaluation = LAZY;
            KafkaIsolation isolation = READ_UNCOMMITTED;
            KafkaDeltaType deltaType = NONE;
        }
        struct KafkaMergedDataEx
        {
            int32 deferred = 0;         // INIT only (TODO: move to DATA frame)
            int64 timestamp = 0;        // INIT only
            int64 filters;              // INIT only
            KafkaOffset partition;      // INIT only
            KafkaOffset[] progress;     // INIT only
            KafkaKey key;               // INIT only
            KafkaDelta delta;           // INIT + FIN
            KafkaHeader[] headers;      // INIT + FIN (produce), INIT only (fetch)
        }
        struct KafkaFetchDataEx
        {
            int32 deferred = 0;         // INIT only (TODO: move to DATA frame)
            int64 timestamp = 0;        // INIT only
            int32 headersSizeMax = 4;   // INIT only
            int64 producerId = -1;      // INIT only
            int64 filters;              // INIT only
            KafkaOffset partition;      // INIT only
            KafkaKey key;               // INIT only
            KafkaDelta delta;           // INIT + FIN
            KafkaHeader[] headers;      // FIN only
        }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants