私はあなたが解決策を見つけたかどうかわからない同じ問題に取り組んでいましたが、私は次のことをすることによって同様のことを達成することができました。まず、テーブルにトリガーを追加しました
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
これにより、行が更新、削除、または挿入されるたびに、テーブルにトリガーが設定されます。次に、設定したトリガー関数を呼び出します。これは次のようになります。
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
これにより、Spring Bootプロジェクトからのこれらの更新のいずれかを「リッスン」できるようになり、行全体がペイロードとして送信されます。次に、Spring Bootプロジェクトで、データベースへの接続を構成しました。
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
これで、サービスクラスのコンストラクターに自動配線(依存性注入)し、次のようにr2dbcPostgressqlConnectionクラスにキャストします。
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
ここで、テーブルを「リッスン」し、テーブルの更新を実行したときに通知を受け取ります。これを行うには、@PostContructアノテーションを使用して依存性注入後に実行される初期化メソッドを設定します
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
pg_notifyメソッド内に付けた名前をリッスンしていることに注意してください。また、次のように、Beanが破棄されようとしているときに接続を閉じるメソッドを設定する必要があります。
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
今、私は単に現在テーブルにあるもののフラックスを返すメソッドを作成し、通知がjsonとして届く前に言ったように、それを通知とマージするので、それを逆シリアル化する必要があり、使用することにしましたObjectMapper。したがって、次のようになります。
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
これがお役に立てば幸いです。乾杯!