sql >> データベース >  >> NoSQL >> Redis

futures.rsとRedisPubSubを使用して、ブロッキング呼び出しのfuturesストリームを実装するにはどうすればよいですか?

    重大な警告 私はこれまでこのライブラリを使用したことがなく、いくつかの概念に関する私の低レベルの知識は少し...不足しています。ほとんどの場合、私はチュートリアルを読んでいます。非同期作業をしたことがある人なら誰でもこれを読んで笑うことは間違いありませんが、他の人にとっては便利な出発点になるかもしれません。エンプターに警告!

    Streamがどのように行われるかを示す、もう少し簡単なものから始めましょう。 動作します。 Resultのイテレータを変換できます sストリームに:

    extern crate futures;
    
    use futures::Future;
    use futures::stream::{self, Stream};
    
    fn main() {
        let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
        let payloads = stream::iter(payloads.into_iter());
    
        let foo = payloads
            .and_then(|payload| futures::finished(println!("{}", payload)))
            .for_each(|_| Ok(()));
    
        foo.forget();
    }
    

    これは、ストリームを消費する1つの方法を示しています。 and_thenを使用します 各ペイロードに何かを実行し(ここではそれを印刷するだけです)、次にfor_each Streamを変換します Futureに戻る 。次に、奇妙な名前のforgetを呼び出すことで、futureを実行できます。 メソッド。

    次は、Redisライブラリをミックスに結び付けて、1つのメッセージだけを処理することです。 get_message()以降 メソッドがブロックしているので、いくつかのスレッドをミックスに導入する必要があります。他のすべてがブロックされるため、このタイプの非同期システムで大量の作業を実行することはお勧めできません。例:

    特に明記されていない限り、この関数の実装は非常に迅速に終了することを確認する必要があります。 。

    理想的な世界では、redisクレートはfuturesのようなライブラリの上に構築され、これらすべてをネイティブに公開します。

    extern crate redis;
    extern crate futures;
    
    use std::thread;
    use futures::Future;
    use futures::stream::{self, Stream};
    
    fn main() {
        let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
    
        let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
        pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
    
        let (tx, payloads) = stream::channel();
    
        let redis_thread = thread::spawn(move || {
            let msg = pubsub.get_message().expect("Unable to get message");
            let payload: Result<String, _> = msg.get_payload();
            tx.send(payload).forget();
        });
    
        let foo = payloads
            .and_then(|payload| futures::finished(println!("{}", payload)))
            .for_each(|_| Ok(()));
    
        foo.forget();
        redis_thread.join().expect("unable to join to thread");
    }
    

    私の理解はここで曖昧になります。別のスレッドで、メッセージをブロックし、メッセージを取得したらチャネルにプッシュします。私が理解していないのは、なぜスレッドのハンドルを握る必要があるのか​​ということです。 foo.forget ストリームが空になるまで待機して、自分自身をブロックします。

    RedisサーバーへのTelnet接続で、これを送信します:

    publish rust awesome
    

    そして、あなたはそれが機能するのを見るでしょう。 printステートメントを追加すると、(私にとっては)foo.forget ステートメントは、スレッドが生成される前に実行されます。

    複数のメッセージは扱いにくいです。 Sender 生成側が消費側よりもはるかに先に進むのを防ぐために、それ自体を消費します。これは、sendから別の未来を返すことによって達成されます !ループの次の反復で再利用するには、そこからシャトルで戻す必要があります。

    extern crate redis;
    extern crate futures;
    
    use std::thread;
    use std::sync::mpsc;
    
    use futures::Future;
    use futures::stream::{self, Stream};
    
    fn main() {
        let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
    
        let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
        pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
    
        let (tx, payloads) = stream::channel();
    
        let redis_thread = thread::spawn(move || {
            let mut tx = tx;
    
            while let Ok(msg) = pubsub.get_message() {
                let payload: Result<String, _> = msg.get_payload();
    
                let (next_tx_tx, next_tx_rx) = mpsc::channel();
    
                tx.send(payload).and_then(move |new_tx| {
                    next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                    futures::finished(())
                }).forget();
    
                tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
            }
        });
    
        let foo = payloads
            .and_then(|payload| futures::finished(println!("{}", payload)))
            .for_each(|_| Ok(()));
    
        foo.forget();
        redis_thread.join().expect("unable to join to thread");
    }
    

    時間が経つにつれて、このタイプの相互運用のためのより多くのエコシステムがあると確信しています。たとえば、futures-cpupoolcrateはおそらく これと同様のユースケースをサポートするように拡張されます。




    1. マングース、入力されたフィールドでクエリを並べ替える

    2. PythonRedisの相互作用

    3. redisサーバーを実行し続ける方法

    4. NodeJs-redisを使用して、expressでredisを接続します