Browse Source

Working with PG notifications

master
Julio Biason 2 years ago
parent
commit
e6e0759814
  1. 1315
      notifytest/Cargo.lock
  2. 13
      notifytest/Cargo.toml
  3. 3
      notifytest/README.md
  4. 50
      notifytest/migrations/202201281050_initial.sql
  5. 29
      notifytest/migrations/202201281127_fix_function.sql
  6. 28
      notifytest/migrations/202201281128_fix2.sql
  7. 26
      notifytest/migrations/202201281133_fix3.sql
  8. 29
      notifytest/migrations/202201281134_fix4.sql
  9. 57
      notifytest/src/main.rs

1315
notifytest/Cargo.lock generated

File diff suppressed because it is too large Load Diff

13
notifytest/Cargo.toml

@ -0,0 +1,13 @@
[package]
name = "notifytest"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = { version = "3.0.13", features = ["derive", "env"] }
env_logger = "0.9.0"
log = "0.4.14"
sqlx = { version = "0.5.10", features = ["runtime-tokio-rustls", "postgres", "migrate", "macros"] }
tokio = { version = "1.16.1", features = ["rt", "net", "macros", "io-util"] }

3
notifytest/README.md

@ -0,0 +1,3 @@
# NotifyTest
Testing Postgres notification features

50
notifytest/migrations/202201281050_initial.sql

@ -0,0 +1,50 @@
CREATE TABLE targets (
level_1 TEXT,
level_2 TEXT,
channel_name TEXT,
UNIQUE (level_1, level_2)
);
CREATE TABLE commands (
id SERIAL PRIMARY KEY,
level_1 TEXT,
level_2 TEXT,
payload TEXT,
channel_name TEXT,
channel_failure BOOLEAN
);
CREATE OR REPLACE FUNCTION notify_channel()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
DECLARE
target_channel TEXT;
BEGIN
SELECT channel_name
INTO target_channel
FROM targets
WHERE level_1 = NEW.level_1
AND level_2 = NEW.level_2;
if found then
SELECT pg_notify(target_channel, NEW.playload);
UPDATE commands
SET channel_name = channel_name, channel_failure = false
WHERE id = NEW.id;
else
UPDATE commands
SET channel_failure = true
WHERE id = NEW.id;
end if;
RETURN NEW;
END;
$$;
CREATE TRIGGER notify_channel
AFTER INSERT
ON commands
FOR EACH ROW
EXECUTE PROCEDURE notify_channel();

29
notifytest/migrations/202201281127_fix_function.sql

@ -0,0 +1,29 @@
CREATE OR REPLACE FUNCTION notify_channel()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
DECLARE
target_channel TEXT;
BEGIN
SELECT channel_name
INTO target_channel
FROM targets
WHERE level_1 = NEW.level_1
AND level_2 = NEW.level_2;
if found then
SELECT pg_notify(target_channel, NEW.payload);
UPDATE commands
SET channel_name = channel_name, channel_failure = false
WHERE id = NEW.id;
else
UPDATE commands
SET channel_failure = true
WHERE id = NEW.id;
end if;
RETURN NEW;
END;
$$;

28
notifytest/migrations/202201281128_fix2.sql

@ -0,0 +1,28 @@
CREATE OR REPLACE FUNCTION notify_channel()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
DECLARE
target_channel TEXT;
BEGIN
SELECT channel_name
INTO target_channel
FROM targets
WHERE level_1 = NEW.level_1
AND level_2 = NEW.level_2;
if found then
SELECT pg_notify(target_channel, NEW.payload);
UPDATE commands
SET channel_name = channel_name, channel_failure = false
WHERE id = NEW.id;
else
UPDATE commands
SET channel_failure = true
WHERE id = NEW.id;
end if;
END;
$$;

26
notifytest/migrations/202201281133_fix3.sql

@ -0,0 +1,26 @@
CREATE OR REPLACE FUNCTION notify_channel()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
DECLARE
target_channel TEXT;
BEGIN
SELECT channel_name
INTO target_channel
FROM targets
WHERE level_1 = NEW.level_1
AND level_2 = NEW.level_2;
if found then
PERFORM pg_notify(target_channel, NEW.payload);
UPDATE commands
SET channel_name = channel_name, channel_failure = false
WHERE id = NEW.id;
else
UPDATE commands
SET channel_failure = true
WHERE id = NEW.id;
end if;
END;
$$;

29
notifytest/migrations/202201281134_fix4.sql

@ -0,0 +1,29 @@
CREATE OR REPLACE FUNCTION notify_channel()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
DECLARE
target_channel TEXT;
BEGIN
SELECT channel_name
INTO target_channel
FROM targets
WHERE level_1 = NEW.level_1
AND level_2 = NEW.level_2;
if found then
PERFORM pg_notify(target_channel, NEW.payload);
UPDATE commands
SET channel_name = channel_name, channel_failure = false
WHERE id = NEW.id;
else
UPDATE commands
SET channel_failure = true
WHERE id = NEW.id;
end if;
RETURN NEW;
END;
$$;

57
notifytest/src/main.rs

@ -0,0 +1,57 @@
use clap::Parser;
use sqlx::postgres::PgListener;
use sqlx::postgres::PgPoolOptions;
use sqlx::postgres::Postgres;
use sqlx::Pool;
async fn connect(url: &str) -> Pool<Postgres> {
PgPoolOptions::new()
.max_connections(3)
.connect(url)
.await
.expect("Failed to connect to the database")
}
async fn create_listener(channel: &str, url: &str) {
let mut listener = PgListener::connect(&url)
.await
.expect("Failed to connect listener {channel}");
listener
.listen(channel)
.await
.expect("Failed to listen to channel {channel}");
log::info!("Listener on {channel} ready...");
loop {
let notification = listener
.recv()
.await
.expect("Failed to receive data from channel {channel}");
log::info!("Notification: {:?}", notification);
}
}
#[derive(Parser)]
#[clap(version, about)]
struct Args {
/// URL for the database connection.
#[clap(short, long, env = "DATABASE_URL")]
database: String,
}
#[tokio::main]
async fn main() {
env_logger::init();
let args = Args::parse();
let pool = connect(&args.database).await;
sqlx::migrate!()
.run(&pool)
.await
.expect("Migration failure");
let listener_type_1 = create_listener("type_1", &args.database);
let listener_type_2 = create_listener("type_2", &args.database);
let listener_type_3 = create_listener("type_3", &args.database);
tokio::join!(listener_type_1, listener_type_2, listener_type_3);
}
Loading…
Cancel
Save