sql >> データベース >  >> RDS >> PostgreSQL

PostgreSQL-信頼できるキューを実装する

    複数のリーダーを考慮すると、各リーダーがすでに受信したレコードを制御する必要があります。

    また、この注文は、リーダーにもレコードを送信するための条件であると言われています。したがって、以前のトランザクションの前にさらにトランザクションがコミットされた場合は、リーダーに送信されるレコードの順序を維持するために、コミットしたときに「停止」してレコードを再度送信する必要があります。

    そうは言っても、実装を確認してください:

    -- lets create our queue table 
    drop table if exists queue_records cascade;
    create table if not exists queue_records 
    (
      cod serial primary key,
      date_posted timestamp default timeofday()::timestamp,
      message text
    );
    
    
    -- lets create a table to save "checkpoints" per reader_id
    drop table if exists queue_reader_checkpoint cascade;
    create table if not exists queue_reader_checkpoint 
    (
      reader_id text primary key,
      last_checkpoint numeric
    );
    
    
    
    CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text)
    RETURNS SETOF queue_records AS
    $BODY$
    DECLARE
        vLAST_CHECKPOINT    numeric;
        vCHECKPOINT_EXISTS  integer;
        vRECORD         queue_records%rowtype;
    BEGIN
    
        -- let's get the last record sent to the reader 
        SELECT  last_checkpoint
        INTO    vLAST_CHECKPOINT
        FROM    queue_reader_checkpoint
        WHERE   reader_id = pREADER_ID;
    
        -- if vLAST_CHECKPOINT is null (this is the very first time of reader_id), 
        -- sets it to the last cod from queue. It means that reader will get records from now on.
        if (vLAST_CHECKPOINT is null) then
            -- sets the flag indicating the reader does not have any checkpoint recorded
            vCHECKPOINT_EXISTS = 0;
            -- gets the very last commited record
            SELECT  MAX(cod)
            INTO    vLAST_CHECKPOINT
            FROM    queue_records;
        else
            -- sets the flag indicating the reader already have a checkpoint recorded
            vCHECKPOINT_EXISTS = 1; 
        end if;
    
        -- now let's get the records from the queue one-by-one 
        FOR vRECORD IN 
                SELECT  *
                FROM    queue_records
                WHERE   COD > vLAST_CHECKPOINT 
                ORDER   BY COD
        LOOP
    
            -- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order
            if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then
    
                -- let's save the last record read
                vLAST_CHECKPOINT = vRECORD.COD;
    
                -- and return it
                RETURN NEXT vRECORD;
    
            -- but, if it is not, then is out of order
            else
                -- the reason is some transaction did not commit yet, but there's another further transaction that alread did.
                -- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already;
                exit;
            end if;
        END LOOP;
    
    
        -- now we have to persist the last record read to be retrieved on next call
        if (vCHECKPOINT_EXISTS = 0) then
            INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT);
        else        
            UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID;
        end if; 
    end;
    $BODY$ LANGUAGE plpgsql VOLATILE;
    



    1. Oracleでのレコードベースの挿入と更新

    2. mysqlを使用して1つのテーブルから別のテーブルに欠落しているレコードを挿入します

    3. Spring Batch ORA-08177:単一のジョブを実行している場合、このトランザクションへのアクセスをシリアル化できません、SERIALIZED分離レベル

    4. SQL Serverの内部:問題のあるオペレーターPt。 I –スキャン