sql >> データベース >  >> NoSQL >> Redis

Flask-SocketIOredisサブスクライブ

    アプリを引数としてクラスに渡し、エラーの説明で示唆されているようにコンテキストを使用して解決しましたが、名前空間も必要です:

    class Listener(threading.Thread):
        def __init__(self, r, channels, app):
        threading.Thread.__init__(self)
        self.daemon = True
        self.redis = r
        self.pubsub = self.redis.pubsub()
        self.pubsub.psubscribe(channels)
        self.app = app
    
        def work(self, item):
            with app.app_context():
                if isinstance(item['data'], bytes):
                    try:
                        msg = item['data'].decode('utf-8')
                        decode_msg = json.loads(msg)                
                        if decode_msg['type'] == 'UPDATE_TASK':
                            send(json.dumps({"type":"UPDATE_TASK"}), room='home', namespace='/')
                        #_send_task_message()
                    except ValueError as e:
                        log.error("Error decoding msg to microservice: %s", str(e))
    
        def run(self):
            for item in self.pubsub.listen():
                self.work(item)
    
    if __name__ == '__main__':
    
        r = redis.Redis()
        client = Listener(r, ['/bobguarana/socketio'], app)
        client.start()
    
        socketio.run(debug=True, app=app, port=8080)
    


    1. MongoDBにファイルをインポートするときにドキュメントをマージする方法

    2. TCP接続で発生するすべてのバイトをどのように読み取りますか?

    3. mongoの結果から_idを削除します

    4. flashdbを使用したRedisキースペース通知