Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pgsql binding #1057

Closed
jfallows opened this issue May 28, 2024 · 4 comments · Fixed by #1200
Closed

Support pgsql binding #1057

jfallows opened this issue May 28, 2024 · 4 comments · Fixed by #1200
Assignees
Labels
enhancement New feature or request

Comments

@jfallows
Copy link
Contributor

jfallows commented May 28, 2024

Support psql binding with server and client kinds, so that pgsql client can be proxied to a pgsql server.

Support route by command, such as CREATE TABLE or SELECT so that different commands can be proxied to different pgsql servers.

See pgsql protocol specification.

Support pgsql command transformations to dynamically inject deployment details.

@jfallows jfallows added the enhancement New feature or request label May 28, 2024
@jfallows jfallows changed the title Support psql binding Support pgsql binding Aug 13, 2024
@akrambek
Copy link
Contributor

CREATE TABLE IF NOT EXISTS balances(
    *,
    PRIMARY KEY (user_id)
)
INCLUDE KEY AS user_id
WITH (
    connector='kafka',
    topic='streampay-balances',
    properties.bootstrap.server='kafka:29092',
    scan.startup.mode='latest',
    scan.startup.timestamp.millis='140000000'
) FORMAT UPSERT ENCODE AVRO (
    schema.registry = 'http://schema-registry:8081'
);

CREATE SOURCE IF NOT EXISTS commands
INCLUDE KEY AS key
INCLUDE header'zilla:correlation-id' AS correlation_id
INCLUDE header 'zilla:identity' AS owner_id
INCLUDE timestamp as timestamp
WITH (
    connector='kafka',
    topic='streampay-commands',
    properties.bootstrap.server='kafka:29092',
    scan.startup.mode='latest',
    scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
    schema.registry = 'http://schema-registry:8081'
);

CREATE FUNCTION column(name varchar) RETURNS VARCHAR LANGUAGE javascript AS $$
    return name;
$$;

INSERT INTO balances (user_id, balance, timestamp) VALUES ('1', 1964, 1723593113);

SELECT * FROM balances;

SHOW TABLES;
DESCRIBE balances;
DROP TABLE balances;

psql-queries.pcapng.zip

@jfallows
Copy link
Contributor Author

Looks like we want a single stream of pgsql messages, mapping to a bidirectional zilla stream, with each pgsql query message mapping to a zilla DATA frame or a zilla FLUSH frame (maintaining received order).

pgsql messages flowing towards the client include type (row description), row (data row), completion (command completion) and ready (ready for query).

query => DATA + PgsqlQueryDataEx
type => FLUSH + PgsqlTypeFlushEx
row => DATA + PgsqlRowDataEx
completion => FLUSH + PgsqlCompletedFlushEx
ready => FLUSH + PgsqlReadyFlushEx

@jfallows
Copy link
Contributor Author

For pgsql message mappings in pgsql proxy binding, we need the following.

CREATE TABLE IF NOT EXISTS balances (name VARCHAR, type VARCHAR, description VARCHAR, PRMARY KEY (name));

becomes

CREATE TOPIC IF NOT EXISTS balances (name VARCHAR, type VARCHAR, description VARCHAR, PRMARY KEY (name)));

and

CREATE TABLE IF NOT EXISTS balances(
    *,
    PRIMARY KEY (name)
)
INCLUDE KEY AS name
WITH (
    connector='kafka',
    topic='balances',
    properties.bootstrap.server='kafka:29092',
    scan.startup.mode='latest',
    scan.startup.timestamp.millis='140000000'
) FORMAT UPSERT ENCODE AVRO (
    schema.registry = 'http://schema-registry:8081'
);

Note: the injected kafka bootstrap server and schema registry endpoint would be configured in pgsql proxy options.

The pgsql response to CREATE TABLE is a complete message followed by a ready message, so if we are splitting this into multiple commands, we can propagate complete and ready messages only after the last split command completes.

@jfallows
Copy link
Contributor Author

jfallows commented Aug 14, 2024

We also need to sync back to kafka.

CREATE MATERIALIZED VIEW request_payments as
SELECT
    generate_guid() as id,
    encode(cmd.owner_id, 'escape') as from_user_id,
    u2.username as from_username,
    cmd.user_id as to_user_id,
    u1.username as to_username,
    amount,
    notes
FROM
    commands as cmd
JOIN
    users u1 ON u1.id = cmd.user_id
JOIN
    users u2 ON u2.id = encode(cmd.owner_id, 'escape')
WHERE
    key IS NOT NULL
    AND type = 'RequestPayment';

becomes

CREATE MATERIALIZED VIEW request_payments as
SELECT
    generate_guid() as id,
    encode(cmd.owner_id, 'escape') as from_user_id,
    u2.username as from_username,
    cmd.user_id as to_user_id,
    u1.username as to_username,
    amount,
    notes
FROM
    commands as cmd
JOIN
    users u1 ON u1.id = cmd.user_id
JOIN
    users u2 ON u2.id = encode(cmd.owner_id, 'escape')
WHERE
    key IS NOT NULL
    AND type = 'RequestPayment';
DESCRIBE MATERIALIZED VIEW request_payments;
CREATE TOPIC request_payments (
    id VARCHAR,
    from_user_id VARCHAR,
    from_username VARCHAR,
    to_user_id VARCHAR,
    to_username VARCHAR,
    amount DOUBLE,
    notes VARCHAR);
CREATE SINK request_payments_sink
FROM request_payments
WITH (
    connector='kafka',
    topic='request-payments',
    properties.bootstrap.server='kafka:29092',
    primary_key='id'
) FORMAT UPSERT ENCODE AVRO (
    schema.registry = 'http://schema-registry:8081'
);

Note: the injected kafka bootstrap server and schema registry endpoint would be configured in pgsql proxy options.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants