sql >> データベース >  >> NoSQL >> Redis

Railsスレッド内の変数へのアクセス

    最後に更新された編集:動作中のコードを表示します。デバッグコードを除いて、メインモジュールは変更されていません。注:終了前に登録を解除する必要性に関して、すでに述べた問題を経験しました。

    コードは正しいようです。どのようにインスタンス化しているかを確認したいと思います。

    config / application.rbには、おそらく少なくとも次のようなものがあります:

    require 'ws_communication'
    config.middleware.use WsCommunication
    

    次に、JavaScriptクライアントで、次のようなものが必要です。

    var ws = new WebSocket(uri);
    

    WsCommunicationの別のインスタンスをインスタンス化しますか?これにより、@ clientが空の配列に設定され、症状が現れる可能性があります。このようなものは正しくありません:

    var ws = new WsCommunication;
    

    クライアントを表示し、この投稿が役に立たない場合はconfig/application.rbを表示すると便利です。

    ちなみに、私は@clientsが更新時にミューテックスによって保護されるべきであるというコメントに同意します。これは動的な構造であり、イベント駆動型システムではいつでも変更される可能性があります。 redis-mutexは良いオプションです。 (Githubは現時点ですべてに500エラーをスローしているように見えるので、そのリンクが正しいことを願っています。)

    $redis.publishがメッセージを受信したクライアントの数の整数値を返すことにも注意してください。

    最後に、終了する前にチャンネルの登録が解除されていることを確認する必要がある場合があります。クリーンアップされていない同じチャネルへの以前のサブスクリプションのために、各メッセージを複数回、さらには何度も送信することになった状況がありました。スレッド内のチャネルにサブスクライブしているため、同じスレッド内でサブスクライブを解除する必要があります。そうしないと、プロセスは「ハング」して、適切なスレッドが魔法のように表示されるのを待ちます。 「登録解除」フラグを設定してからメッセージを送信することで、この状況に対処します。次に、on.messageブロック内で、購読解除フラグをテストし、そこで購読解除を発行します。

    提供したモジュールで、デバッグにわずかな変更を加えただけです:

    require 'faye/websocket'
    require 'redis'
    
    class WsCommunication
      KEEPALIVE_TIME = 15 #seconds
      CHANNEL = 'vip-deck'
    
      def initialize(app)
        @app = app
        @clients = []
        uri = URI.parse(ENV['REDISCLOUD_URL'])
        $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
        Thread.new do
          redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
          redis_sub.subscribe(CHANNEL) do |on|
            on.message do |channel, msg|
              puts "Message event. Clients receiving:#{@clients.count};"
              @clients.each { |ws| ws.send(msg) }
            end
          end
        end
      end
    
      def call(env)
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
    
          ws.on :open do |event|
            @clients << ws
            puts "Open event. Clients open:#{@clients.count};"
          end
    
          ws.on :message do |event|
            receivers = $redis.publish(CHANNEL, event.data)
            puts "Message published:#{event.data}; Receivers:#{receivers};"
          end
    
          ws.on :close do |event|
            @clients.delete(ws)
            puts "Close event. Clients open:#{@clients.count};"
            ws = nil
          end
    
          ws.rack_response
        else
          @app.call(env)
        end
      end
    end
    

    私が提供したテストサブスクライバーコード:

    # encoding: UTF-8
    puts "Starting client-subscriber.rb"
    $:.unshift File.expand_path '../lib', File.dirname(__FILE__)
    require 'rubygems'
    require 'eventmachine'
    require 'websocket-client-simple'
    
    puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
    
    url = ARGV.shift || 'ws://localhost:3000'
    
    EM.run do
    
      ws = WebSocket::Client::Simple.connect url
    
      ws.on :message do |msg|
        puts msg
      end
    
      ws.on :open do
        puts "-- Subscriber open (#{ws.url})"
      end
    
      ws.on :close do |e|
        puts "-- Subscriber close (#{e.inspect})"
        exit 1
      end
    
      ws.on :error do |e|
        puts "-- Subscriber error (#{e.inspect})"
      end
    
    end
    

    私が提供したテスト発行者コード。パブリッシャーとサブスクライバーは単なるテストであるため、簡単に組み合わせることができます。

    # encoding: UTF-8
    puts "Starting client-publisher.rb"
    $:.unshift File.expand_path '../lib', File.dirname(__FILE__)
    require 'rubygems'
    require 'eventmachine'
    require 'json'
    require 'websocket-client-simple'
    
    puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
    
    url = ARGV.shift || 'ws://localhost:3000'
    
    EM.run do
      count ||= 0
      timer = EventMachine.add_periodic_timer(5+rand(5)) do
        count += 1
        send({"MESSAGE": "COUNT:#{count};"})
      end
    
      @ws = WebSocket::Client::Simple.connect url
    
      @ws.on :message do |msg|
        puts msg
      end
    
      @ws.on :open do
        puts "-- Publisher open"
      end
    
      @ws.on :close do |e|
        puts "-- Publisher close (#{e.inspect})"
        exit 1
      end
    
      @ws.on :error do |e|
        puts "-- Publisher error (#{e.inspect})"
        @ws.close
      end
    
      def self.send message
        payload = message.is_a?(Hash) ? message : {payload: message}
        @ws.send(payload.to_json)
      end
    end
    

    これらすべてをラックミドルウェアレイヤーで実行するサンプルconfig.ru:

    require './controllers/main'
    require './middlewares/ws_communication'
    use WsCommunication
    run Main.new
    

    これがメインです。実行中のバージョンから削除したので、使用する場合は微調整が必​​要になる可能性があります:

    %w(rubygems bundler sinatra/base json erb).each { |m| require m }
    ENV['RACK_ENV'] ||= 'development'
    Bundler.require
    $: << File.expand_path('../', __FILE__)
    $: << File.expand_path('../lib', __FILE__)
    
    Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    
      class Main < Sinatra::Base
    
        env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
        get "/" do
          erb :"index.html"
        end
    
        get "/assets/js/application.js" do
          content_type :js
          @scheme = env == "production" ? "wss://" : "ws://"
          erb :"application.js"
        end
      end
    


    1. MongoCredentialおよび未分類のMongoDb例外の認証の例外

    2. Redisキャッシュを使用して大きなオブジェクトをキャッシュする方法

    3. Node.jsとRedis/hget synchronize

    4. CDH 6.2リリース:HBaseの新機能