チュートリアルのこの部分では、テキスト処理を処理するためにRedisタスクキューを実装する方法について詳しく説明します。
更新:
- 2020年2月12日:Pythonバージョン3.8.1と、最新バージョンのRedis、Python Redis、およびRQにアップグレードしました。詳細については、以下を参照してください。最新のRQバージョンのバグに言及し、解決策を提供します。 httpsバグの前にhttpを解決しました。
- 2016年3月22日:Pythonバージョン3.5.1にアップグレードされ、最新バージョンのRedis、Python Redis、およびRQにアップグレードされました。詳細については、以下をご覧ください。
- 2015年2月22日:Python3のサポートが追加されました。
注意:これが私たちが構築しているものです。特定のURLからのテキストに基づいて単語と頻度のペアを計算するFlaskアプリです。
- パート1:ローカル開発環境をセットアップしてから、ステージング環境と本番環境の両方をHerokuにデプロイします。
- パート2:移行を処理するためにSQLAlchemyおよびAlembicとともにPostgreSQLデータベースをセットアップします。
- パート3:バックエンドロジックを追加して、リクエスト、BeautifulSoup、Natural Language Toolkit(NLTK)ライブラリを使用してウェブページから単語数を取得して処理します。
- パート4:テキスト処理を処理するためのRedisタスクキューを実装します。 (現在 )
- パート5:フロントエンドにAngularを設定して、バックエンドを継続的にポーリングし、リクエストの処理が完了したかどうかを確認します。
- パート6:Herokuのステージングサーバーにプッシュします-Redisをセットアップし、1つのDynoで2つのプロセス(Webとワーカー)を実行する方法を詳しく説明します。
- パート7:フロントエンドを更新してユーザーフレンドリーにします。
- パート8:JavaScriptとD3を使用して度数分布図を表示するカスタムAngularディレクティブを作成します。
<マーク>コードが必要ですか?リポジトリから取得します。
インストール要件
使用したツール:
- Redis(5.0.7)
- Python Redis(3.4.1)
- RQ(1.2.2)-タスクキューを作成するためのシンプルなライブラリ
公式サイトまたはHomebrewからRedisをダウンロードしてインストールすることから始めます(brew install redis
)。インストールしたら、Redisサーバーを起動します:
$ redis-server
次に、PythonRedisとRQを新しいターミナルウィンドウにインストールします。
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt
ワーカーを設定する
キューに入れられたタスクをリッスンするワーカープロセスを作成することから始めましょう。新しいファイルを作成しますworker.py 、次のコードを追加します:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
ここでは、default
というキューをリッスンしました localhost:6379
でRedisサーバーへの接続を確立しました 。
別のターミナルウィンドウでこれを起動します:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...
次に、 app.pyを更新する必要があります キューにジョブを送信するには…
app.pyを更新します
次のインポートをapp.pyに追加します :
from rq import Queue
from rq.job import Job
from worker import conn
次に、構成セクションを更新します:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)
Redis接続をセットアップし、その接続に基づいてキューを初期化しました。
テキスト処理機能をインデックスルートからcount_and_save_words()
という新しい関数に移動します 。この関数は、1つの引数であるURLを受け入れます。これは、インデックスルートから呼び出すときに渡されます。
def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# this import solves a rq bug which currently exists
from app import count_and_save_words
# get url that the person has entered
url = request.form['url']
if not url[:8].startswith(('https://', 'http://')):
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
次のコードに注意してください:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
注:
count_and_save_words
をインポートする必要があります 関数index
の関数 RQパッケージには現在バグがあり、同じモジュール内の関数が見つからないためです。
ここでは、以前に初期化してenqueue_call()
と呼んだキューを使用しました 働き。これにより、キューに新しいジョブが追加され、そのジョブはcount_and_save_words()
を実行しました。 URLを引数として機能します。 result_ttl=5000
line引数は、この場合、ジョブの結果を-5,000秒間保持する時間をRQに指示します。次に、ジョブIDを端末に出力しました。このIDは、ジョブの処理が完了したかどうかを確認するために必要です。
そのための新しいルートを設定しましょう…
結果を得る
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
これをテストしてみましょう。
サーバーを起動し、http:// localhost:5000 /に移動し、URL https://realpython.comを使用して、ターミナルからジョブIDを取得します。次に、そのIDを「/ results /」エンドポイントで使用します。つまり、http:// localhost:5000 / results/ef600206-3503-4b87-a436-ddd9438f2197です。
ステータスを確認する前に5,000秒未満が経過している限り、結果をデータベースに追加するときに生成されるID番号が表示されます。
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
それでは、ルートを少しリファクタリングして、データベースから実際の結果をJSONで返しましょう:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
必ずインポートを追加してください:
from flask import jsonify
これをもう一度テストしてください。すべてがうまくいけば、ブラウザに次のようなものが表示されるはずです。
[
[
"Python",
315
],
[
"intermediate",
167
],
[
"python",
161
],
[
"basics",
118
],
[
"web-dev",
108
],
[
"data-science",
51
],
[
"best-practices",
49
],
[
"advanced",
45
],
[
"django",
43
],
[
"flask",
41
]
]
次は何ですか?
パート5では、Angularをミックスに追加してポーラーを作成し、クライアントとサーバーを統合します。ポーラーは、5秒ごとに/results/<job_key>
にリクエストを送信します。 更新を要求するエンドポイント。データが利用可能になったら、それをDOMに追加します。
乾杯!
これは、StartupEdmontonの共同創設者であるCamLinkeとRealPythonの人々とのコラボレーション作品です。