あなたが与えた例と擬似コードで、それを想像してみましょう:
-
recipient.user1
1分間に60通のメッセージを受信しています - および
perform_task()
メソッドの実行には2秒かかります。
ここで何が起こるかは明らかです。新しいメッセージが届いてから処理されるまでの待ち時間は時間の経過とともに大きくなり、「リアルタイム処理」からどんどんドリフトしていきます。
system throughput = 30 messages/minute
これを回避するには、user1
のコンシューマーグループを作成することをお勧めします 。ここでは、4つの異なるPythonプロセスを並行して実行し、4つすべてをuser1
の同じグループに参加させることができます。 。 user1
にメッセージが届いたら 4人のワーカーの1人がそれを受け取り、perform_task()
。
system throughput = 120 message/minute
あなたの例では、message.acknowledge()
ストリームリーダーが単独であるため(XREADコマンド)、実際には存在しません。
グループの場合、メッセージの確認が不可欠になります。これにより、グループメンバーの1人が実際にそのメッセージを処理したことをredisが認識し、「先に進む」ことができます(そのメッセージが確認待ちであったことを忘れる可能性があります) 。グループを使用している場合は、すべてのメッセージがコンシューマーグループのワーカーの1つに一度配信されるようにするために、サーバー側のロジックが少し用意されています。 (XGROUPREADコマンド)。クライアントが終了すると、そのメッセージの確認応答(XACKコマンド)を発行して、サーバー側の「コンシューマーグループバッファー」がメッセージを削除して先に進むことができるようにします。
労働者が死亡し、メッセージを確認しなかった場合を想像してみてください。コンシューマーグループを使用すると、(XPENDINGコマンドを使用して)この状況に注意し、たとえば別のコンシューマーで同じメッセージの処理を再試行することで、それらに対処できます。
グループを使用していない場合、redisサーバーは「先に進む」必要はなく、「確認応答」は100%クライアント側/ビジネスロジックになります。