sql >> データベース >  >> NoSQL >> Redis

例によるフラスコ–Redisタスクキューの実装

    チュートリアルのこの部分では、テキスト処理を処理するためにRedisタスクキューを実装する方法について詳しく説明します。

    更新:

    • 2020年2月12日:Pythonバージョン3.8.1と、最新バージョンのRedis、Python Redis、およびRQにアップグレードしました。詳細については、以下を参照してください。最新のRQバージョンのバグに言及し、解決策を提供します。 httpsバグの前にhttpを解決しました。
    • 2016年3月22日:Pythonバージョン3.5.1にアップグレードされ、最新バージョンのRedis、Python Redis、およびRQにアップグレードされました。詳細については、以下をご覧ください。
    • 2015年2月22日:Python3のサポートが追加されました。

    無料ボーナス: ここをクリックして、FlaskWebアプリの構築方法を段階的に説明する無料のFlask+Pythonビデオチュートリアルにアクセスしてください。

    注意:これが私たちが構築しているものです。特定のURLからのテキストに基づいて単語と頻度のペアを計算するFlaskアプリです。

    1. パート1:ローカル開発環境をセットアップしてから、ステージング環境と本番環境の両方をHerokuにデプロイします。
    2. パート2:移行を処理するためにSQLAlchemyおよびAlembicとともにPostgreSQLデータベースをセットアップします。
    3. パート3:バックエンドロジックを追加して、リクエスト、BeautifulSoup、Natural Language Toolkit(NLTK)ライブラリを使用してウェブページから単語数を取得して処理します。
    4. パート4:テキスト処理を処理するためのRedisタスクキューを実装します。 (現在
    5. パート5:フロントエンドにAngularを設定して、バックエンドを継続的にポーリングし、リクエストの処理が完了したかどうかを確認します。
    6. パート6:Herokuのステージングサーバーにプッシュします-Redisをセットアップし、1つのDynoで2つのプロセス(Webとワーカー)を実行する方法を詳しく説明します。
    7. パート7:フロントエンドを更新してユーザーフレンドリーにします。
    8. パート8:JavaScriptとD3を使用して度数分布図を表示するカスタムAngularディレクティブを作成します。

    <マーク>コードが必要ですか?リポジトリから取得します。


    インストール要件

    使用したツール:

    • Redis(5.0.7)
    • Python Redis(3.4.1)
    • RQ(1.2.2)-タスクキューを作成するためのシンプルなライブラリ

    公式サイトまたはHomebrewからRedisをダウンロードしてインストールすることから始めます(brew install redis )。インストールしたら、Redisサーバーを起動します:

    $ redis-server
    

    次に、PythonRedisとRQを新しいターミナルウィンドウにインストールします。

    $ cd flask-by-example
    $ python -m pip install redis==3.4.1 rq==1.2.2
    $ python -m pip freeze > requirements.txt
    


    ワーカーを設定する

    キューに入れられたタスクをリッスンするワーカープロセスを作成することから始めましょう。新しいファイルを作成しますworker.py 、次のコードを追加します:

    import os
    
    import redis
    from rq import Worker, Queue, Connection
    
    listen = ['default']
    
    redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
    
    conn = redis.from_url(redis_url)
    
    if __name__ == '__main__':
        with Connection(conn):
            worker = Worker(list(map(Queue, listen)))
            worker.work()
    

    ここでは、defaultというキューをリッスンしました localhost:6379でRedisサーバーへの接続を確立しました 。

    別のターミナルウィンドウでこれを起動します:

    $ cd flask-by-example
    $ python worker.py
    17:01:29 RQ worker started, version 0.5.6
    17:01:29
    17:01:29 *** Listening on default...
    

    次に、 app.pyを更新する必要があります キューにジョブを送信するには…



    app.pyを更新します

    次のインポートをapp.pyに追加します :

    from rq import Queue
    from rq.job import Job
    from worker import conn
    

    次に、構成セクションを更新します:

    app = Flask(__name__)
    app.config.from_object(os.environ['APP_SETTINGS'])
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
    db = SQLAlchemy(app)
    
    q = Queue(connection=conn)
    
    from models import *
    

    q = Queue(connection=conn) Redis接続をセットアップし、その接続に基づいてキューを初期化しました。

    テキスト処理機能をインデックスルートからcount_and_save_words()という新しい関数に移動します 。この関数は、1つの引数であるURLを受け入れます。これは、インデックスルートから呼び出すときに渡されます。

    def count_and_save_words(url):
    
        errors = []
    
        try:
            r = requests.get(url)
        except:
            errors.append(
                "Unable to get URL. Please make sure it's valid and try again."
            )
            return {"error": errors}
    
        # text processing
        raw = BeautifulSoup(r.text).get_text()
        nltk.data.path.append('./nltk_data/')  # set the path
        tokens = nltk.word_tokenize(raw)
        text = nltk.Text(tokens)
    
        # remove punctuation, count raw words
        nonPunct = re.compile('.*[A-Za-z].*')
        raw_words = [w for w in text if nonPunct.match(w)]
        raw_word_count = Counter(raw_words)
    
        # stop words
        no_stop_words = [w for w in raw_words if w.lower() not in stops]
        no_stop_words_count = Counter(no_stop_words)
    
        # save the results
        try:
            result = Result(
                url=url,
                result_all=raw_word_count,
                result_no_stop_words=no_stop_words_count
            )
            db.session.add(result)
            db.session.commit()
            return result.id
        except:
            errors.append("Unable to add item to database.")
            return {"error": errors}
    
    
    @app.route('/', methods=['GET', 'POST'])
    def index():
        results = {}
        if request.method == "POST":
            # this import solves a rq bug which currently exists
            from app import count_and_save_words
    
            # get url that the person has entered
            url = request.form['url']
            if not url[:8].startswith(('https://', 'http://')):
                url = 'http://' + url
            job = q.enqueue_call(
                func=count_and_save_words, args=(url,), result_ttl=5000
            )
            print(job.get_id())
    
        return render_template('index.html', results=results)
    

    次のコードに注意してください:

    job = q.enqueue_call(
        func=count_and_save_words, args=(url,), result_ttl=5000
    )
    print(job.get_id())
    

    注: count_and_save_wordsをインポートする必要があります 関数indexの関数 RQパッケージには現在バグがあり、同じモジュール内の関数が見つからないためです。

    ここでは、以前に初期化してenqueue_call()と呼んだキューを使用しました 働き。これにより、キューに新しいジョブが追加され、そのジョブはcount_and_save_words()を実行しました。 URLを引数として機能します。 result_ttl=5000 line引数は、この場合、ジョブの結果を-5,000秒間保持する時間をRQに指示します。次に、ジョブIDを端末に出力しました。このIDは、ジョブの処理が完了したかどうかを確認するために必要です。

    そのための新しいルートを設定しましょう…



    結果を得る

    @app.route("/results/<job_key>", methods=['GET'])
    def get_results(job_key):
    
        job = Job.fetch(job_key, connection=conn)
    
        if job.is_finished:
            return str(job.result), 200
        else:
            return "Nay!", 202
    

    これをテストしてみましょう。

    サーバーを起動し、http:// localhost:5000 /に移動し、URL https://realpython.comを使用して、ターミナルからジョブIDを取得します。次に、そのIDを「/ results /」エンドポイントで使用します。つまり、http:// localhost:5000 / results/ef600206-3503-4b87-a436-ddd9438f2197です。

    ステータスを確認する前に5,000秒未満が経過している限り、結果をデータベースに追加するときに生成されるID番号が表示されます。

    # save the results
    try:
        from models import Result
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    

    それでは、ルートを少しリファクタリングして、データベースから実際の結果をJSONで返しましょう:

    @app.route("/results/<job_key>", methods=['GET'])
    def get_results(job_key):
    
        job = Job.fetch(job_key, connection=conn)
    
        if job.is_finished:
            result = Result.query.filter_by(id=job.result).first()
            results = sorted(
                result.result_no_stop_words.items(),
                key=operator.itemgetter(1),
                reverse=True
            )[:10]
            return jsonify(results)
        else:
            return "Nay!", 202
    

    必ずインポートを追加してください:

    from flask import jsonify
    

    これをもう一度テストしてください。すべてがうまくいけば、ブラウザに次のようなものが表示されるはずです。

    [
      [
        "Python", 
        315
      ], 
      [
        "intermediate", 
        167
      ], 
      [
        "python", 
        161
      ], 
      [
        "basics", 
        118
      ], 
      [
        "web-dev", 
        108
      ], 
      [
        "data-science", 
        51
      ], 
      [
        "best-practices", 
        49
      ], 
      [
        "advanced", 
        45
      ], 
      [
        "django", 
        43
      ], 
      [
        "flask", 
        41
      ]
    ]
    


    次は何ですか?

    無料ボーナス: ここをクリックして、FlaskWebアプリの構築方法を段階的に説明する無料のFlask+Pythonビデオチュートリアルにアクセスしてください。

    パート5では、Angularをミックスに追加してポーラーを作成し、クライアントとサーバーを統合します。ポーラーは、5秒ごとに/results/<job_key>にリクエストを送信します。 更新を要求するエンドポイント。データが利用可能になったら、それをDOMに追加します。

    乾杯!

    これは、StartupEdmontonの共同創設者であるCamLinkeとRealPythonの人々とのコラボレーション作品です。



    1. SpringRedisエラーハンドル

    2. Hadoopでのラック認識とその利点

    3. Rails + MongoMapper+EmbeddedDocumentフォームヘルプ

    4. シンプルなExpressアプリでNode.jsクラスターを使用するにはどうすればよいですか?