重大な警告 私はこれまでこのライブラリを使用したことがなく、いくつかの概念に関する私の低レベルの知識は少し...不足しています。ほとんどの場合、私はチュートリアルを読んでいます。非同期作業をしたことがある人なら誰でもこれを読んで笑うことは間違いありませんが、他の人にとっては便利な出発点になるかもしれません。エンプターに警告!
Stream
がどのように行われるかを示す、もう少し簡単なものから始めましょう。 動作します。 Result
のイテレータを変換できます sストリームに:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
これは、ストリームを消費する1つの方法を示しています。 and_then
を使用します 各ペイロードに何かを実行し(ここではそれを印刷するだけです)、次にfor_each
Stream
を変換します Future
に戻る 。次に、奇妙な名前のforget
を呼び出すことで、futureを実行できます。 メソッド。
次は、Redisライブラリをミックスに結び付けて、1つのメッセージだけを処理することです。 get_message()
以降 メソッドがブロックしているので、いくつかのスレッドをミックスに導入する必要があります。他のすべてがブロックされるため、このタイプの非同期システムで大量の作業を実行することはお勧めできません。例:
特に明記されていない限り、この関数の実装は非常に迅速に終了することを確認する必要があります。 。
理想的な世界では、redisクレートはfuturesのようなライブラリの上に構築され、これらすべてをネイティブに公開します。
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
私の理解はここで曖昧になります。別のスレッドで、メッセージをブロックし、メッセージを取得したらチャネルにプッシュします。私が理解していないのは、なぜスレッドのハンドルを握る必要があるのかということです。 foo.forget
ストリームが空になるまで待機して、自分自身をブロックします。
RedisサーバーへのTelnet接続で、これを送信します:
publish rust awesome
そして、あなたはそれが機能するのを見るでしょう。 printステートメントを追加すると、(私にとっては)foo.forget
ステートメントは、スレッドが生成される前に実行されます。
複数のメッセージは扱いにくいです。 Sender
生成側が消費側よりもはるかに先に進むのを防ぐために、それ自体を消費します。これは、send
から別の未来を返すことによって達成されます !ループの次の反復で再利用するには、そこからシャトルで戻す必要があります。
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
時間が経つにつれて、このタイプの相互運用のためのより多くのエコシステムがあると確信しています。たとえば、futures-cpupoolcrateはおそらく これと同様のユースケースをサポートするように拡張されます。