apeescape2.com
  • メイン
  • 設計プロセス
  • 人とチーム
  • ツールとチュートリアル
  • バックエンド
バックエンド

Python用Celeryでのバックグラウンドジョブワークフローの調整

最新のWebアプリケーションとその基盤となるシステムは、これまでになく高速で応答性が高くなっています。ただし、重いタスクの実行を、メインスレッドで取り組むのではなく、システムアーキテクチャ全体の他の部分にオフロードしたい場合がまだ多くあります。このようなタスクの識別は、タスクが次のカテゴリのいずれかに属しているかどうかを確認するのと同じくらい簡単です。

  • 定期的なタスク—特定の時間または間隔の後に実行するようにスケジュールするジョブ。たとえば、月次レポートの生成や1日2回実行されるWebスクレイパー。
  • サードパーティのタスク— Webアプリは、ページの読み込み中に他のアクションが完了するのを待たずに、ユーザーに迅速にサービスを提供する必要があります。たとえば、メールや通知の送信、内部ツールへの更新の伝達(A / Bテストやシステムロギングのためのデータの収集など) )。
  • 長時間実行ジョブ—リソースが高価で、ユーザーが結果を計算するまで待つ必要があるジョブ。たとえば、複雑なワークフローの実行(DAGワークフロー)、グラフの生成、Map-Reduceのようなタスク、メディアコンテンツの提供(ビデオ、オーディオ)。

バックグラウンドタスクを実行する簡単なソリューションは、別のスレッドまたはプロセス内でタスクを実行することです。 Pythonは高水準チューリング完全プログラミング言語ですが、残念ながら、Erlang、Go、Java、Scala、またはAkkaに匹敵する規模の組み込み同時実行性は提供されていません。これらは、TonyHoareのCommunicatingSequential Processes( CSP )。一方、Pythonスレッドは、グローバルインタープリターロックによって調整およびスケジュールされます( GIL )。これにより、複数のネイティブスレッドがPythonバイトコードを一度に実行できなくなります。 GILを取り除くことは、多くの議論のトピックです Python開発者 、しかしそれはこの記事の焦点では​​ありません。 Pythonでの並行プログラミングは古風ですが、でそれについて読むことを歓迎します Pythonマルチスレッドチュートリアル 仲間のApeeScapeerマーカスマッカーディによる。したがって、プロセス間の通信を一貫して設計することはエラーが発生しやすいプロセスであり、スケーラビリティに悪影響を与えることは言うまでもなく、コードの結合とシステムの保守性の低下につながります。さらに、Pythonプロセスはオペレーティングシステム(OS)での通常のプロセスであり、Python標準ライブラリ全体を使用すると非常に重要になります。アプリ内のプロセスの数が増えると、そのようなプロセスから別のプロセスへの切り替えは時間のかかる操作になります。

Pythonとの並行性をよりよく理解するには、DavidBeazleyによるこの素晴らしいスピーチをご覧ください。 PyCon’15 。



はるかに良い解決策は、 分散キュー または、そのよく知られた兄弟パラダイムと呼ばれる パブリッシュ/サブスクライブ 。図1に示すように、アプリケーションには2つのタイプがあり、そのうちの1つは 出版社 、メッセージとその他のメッセージを送信します。 加入者 、メッセージを受信します。これらの2つのエージェントは互いに直接対話せず、互いに気づいていません。パブリッシャーは中央キューにメッセージを送信します。 ブローカ 、およびサブスクライバーは、このブローカーから関心のあるメッセージを受信します。この方法には2つの主な利点があります。

  • スケーラビリティ—エージェントはネットワーク内でお互いについて知る必要はありません。それらはトピックごとに焦点を合わせています。つまり、それぞれが非同期的に他方に関係なく正常に動作し続けることができるということです。
  • 疎結合—各エージェントはシステムのその部分(サービス、モジュール)を表します。それらは疎結合であるため、それぞれがデータセンターを超えて個別にスケーリングできます。

このようなパラダイムをサポートし、TCPまたはHTTPプロトコルのいずれかによって駆動されるきちんとしたAPIを提供するメッセージングシステムがたくさんあります。たとえば、JMS、RabbitMQ、Redis Pub / Sub、ApacheActiveMQなどです。

CeleryPythonを使用したパブリッシュ/サブスクライブパラダイム
図1:パブリッシュ/サブスクライブパラダイム

セロリとは?

セロリ Pythonの世界で最も人気のあるバックグラウンドジョブマネージャーの1つです。 Celeryは、RabbitMQやRedisなどのいくつかのメッセージブローカーと互換性があり、プロデューサーとコンシューマーの両方として機能できます。

Celeryは、分散メッセージパッシングに基づく非同期タスクキュー/ジョブキューです。リアルタイム操作に重点を置いていますが、スケジューリングもサポートしています。タスクと呼ばれる実行ユニットは、マルチプロセッシングを使用して1つ以上のワーカーサーバーで同時に実行されます。 イベントレット 、または ベント 。タスクは、非同期(バックグラウンドで)または同期(準備ができるまで待つ)で実行できます。 – セロリプロジェクト

セロリの使用を開始するには、次のステップバイステップガイドに従ってください。 公式ドキュメント 。

この記事の焦点は、Celeryがカバーできるユースケースをよく理解することです。この記事では、興味深い例を示すだけでなく、バ​​ックグラウンドメーリング、レポート生成、ロギング、エラーレポートなどの実際のタスクにCeleryを適用する方法についても学びます。エミュレーションを超えてタスクをテストする方法を共有し、最後に、公式ドキュメントに(十分に)文書化されていないいくつかのトリックを提供します。これは、自分で発見するのに何時間もの調査を要しました。

Webアニメーションの作り方

Celeryの使用経験がない場合は、公式チュートリアルに従って最初に試してみることをお勧めします。

食欲をそそる

この記事に興味をそそられ、すぐにコードに飛び込みたくなる場合は、これに従ってください GitHubリポジトリ この記事で使用されているコードについて。 READMEそこにあるファイルは、サンプルアプリケーションを実行および操作するための迅速で汚いアプローチを提供します。

セロリの最初のステップ

手始めに、Celeryが一見重要なタスクをいかにシンプルかつエレガントに解決するかを読者に示す一連の実用的な例を紹介します。すべての例は、Djangoフレームワーク内で提示されます。ただし、それらのほとんどは他のPythonフレームワーク(Flask、Pyramid)に簡単に移植できます。

プロジェクトのレイアウトはによって生成されました ダルマザメDjango ;ただし、私の意見では、これらのユースケースの開発と準備を容易にするいくつかの依存関係のみを保持しました。また、ノイズを減らし、コードを理解しやすくするために、この投稿とアプリケーションに不要なモジュールを削除しました。

- celery_uncovered/ - celery_uncovered/__init__.py - celery_uncovered/{toyex,tricks,advex} - celery_uncovered/celery.py - config/settings/{base,local,test}.py - config/urls.py - manage.py
  • celery_uncovered/{toyex,tricks,advex}この投稿で取り上げるさまざまなアプリケーションが含まれています。各アプリケーションには、必要なCeleryの理解レベル別に整理された一連の例が含まれています。
  • celery_uncovered/celery.py Celeryインスタンスを定義します。

ファイル: celery_uncovered/celery.py:

from __future__ import absolute_import import os from celery import Celery, signals # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') app = Celery('celery_uncovered') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()

次に、CeleryがDjangoと一緒に起動することを確認する必要があります。そのため、アプリをcelery_uncovered/__init__.pyにインポートします。

ファイル: celery_uncovered/__init__.py:

from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa __all__ = ['celery_app'] __version__ = '0.0.1' __version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.')])

config/settingsアプリとCeleryの構成のソースです。実行環境に応じて、Djangoは対応する設定を起動します:local.py開発用またはtest.py検査用の。必要に応じて、新しいpythonモジュール(prod.pyなど)を作成して、独自の環境を定義することもできます。セロリ構成の接頭辞はCELERY_です。この投稿では、RabbitMQをブローカーとして構成し、SQLiteを結果のbac-endとして構成しました。

ファイル: config/local.py:

CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest: [email protected] :5672//') CELERY_RESULT_BACKEND = 'django-db+sqlite:///results.sqlite'

シナリオ1-レポートの生成とエクスポート

最初に取り上げるのは、レポートの生成とエクスポートです。この例では、CSVレポートを生成するタスクを定義し、それを定期的にスケジュールする方法を学習します。 celerybeat 。

ユースケースの説明: 選択した期間(日、週、月)ごとにGitHubから最もホットな500のリポジトリをフェッチし、トピックごとにグループ化して、結果をCSVファイルにエクスポートします。

「レポートの生成」というラベルの付いたボタンをクリックしてトリガーされるこの機能を実行するHTTPサービスを提供する場合、アプリケーションは停止し、タスクが完了するのを待ってからHTTP応答を送り返します。これは悪いです。ウェブアプリケーションを高速にし、バックエンドが結果を計算する間、ユーザーが待たされることを望んでいません。結果が生成されるのを待つ代わりに、Celeryの登録済みキューを介してワーカープロセスにタスクをキューに入れ、task_idで応答します。フロントエンドに。次に、フロントエンドはtask_idを使用しますタスクの結果を非同期的にクエリし(AJAXなど)、タスクの進行状況をユーザーに知らせます。最後に、プロセスが終了すると、結果をファイルとして提供してHTTP経由でダウンロードできます。

実装の詳細

まず、プロセスを可能な限り最小の単位に分解し、パイプラインを作成しましょう。

  1. フェッチャー GitHubサービスからリポジトリを取得する責任があるワーカーです。
  2. ザ・ アグリゲーター 結果を1つのリストに統合する責任があるワーカーです。
  3. ザ・ 輸入業者 GitHubで最もホットなリポジトリのCSVレポートを作成しているワーカーです。
セロリPythonワーカーのパイプライン
図2:CeleryとPythonを使用するワーカーのパイプライン

リポジトリのフェッチは、を使用したHTTPリクエストです。 GitHub検索API GET /search/repositories。ただし、処理する必要のあるGitHub APIサービスには制限があります。APIはリクエストごとに500ではなく最大100のリポジトリを返します。一度に5つのリクエストを送信できますが、ユーザーを待たせたくありません。 HTTPリクエストはI / Oバウンド操作であるため、5つの個別のリクエストに対して。代わりに、適切なページパラメータを使用して5つの同時HTTPリクエストを実行できます。したがって、ページは[1..5]の範囲になります。 fetch_hot_repos/3 -> listというタスクを定義しましょうtoyex/tasks.pyでモジュール:

ファイル: celery_uncovered/toyex/local.py

@shared_task def fetch_hot_repos(since, per_page, page): payload = { 'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.format(date=since), 'per_page': per_page, 'page': page, 'access_token': settings.GITHUB_OAUTH} headers = {'Accept': 'application/vnd.github.v3+json'} connect_timeout, read_timeout = 5.0, 30.0 r = requests.get( 'https://api.github.com/search/repositories', params=payload, headers=headers, timeout=(connect_timeout, read_timeout)) items = r.json()[u'items'] return items

だからfetch_hot_repos GitHub APIへのリクエストを作成し、リポジトリのリストでユーザーに応答します。リクエストペイロードを定義する3つのパラメータを受け取ります。

  • since —作成日にリポジトリをフィルタリングします。
  • per_page —リクエストごとに返す結果の数(100に制限)。
  • page -要求されたページ番号([1..5]の範囲内)。

注意: GitHub Search APIを使用するには、認証チェックに合格するためのOAuthトークンが必要です。この場合、GITHUB_OAUTHの下の設定に保存されます。

次に、結果を集計してCSVファイルにエクスポートするマスタータスクを定義する必要があります。produce_hot_repo_report_task/2->filepath:

ファイル: celery_uncovered/toyex/local.py

@shared_task def produce_hot_repo_report(period, ref_date=None): # 1. parse date ref_date_str = strf_date(period, ref_date=ref_date) # 2. fetch and join fetch_jobs = group([ fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5) ]) # 3. group by language and # 4. create csv return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get() @shared_task def build_report_task(results, ref_date): all_repos = [] for repos in results: all_repos += [Repository(repo) for repo in repos] # 3. group by language grouped_repos = {} for repo in all_repos: if repo.language in grouped_repos: grouped_repos[repo.language].append(repo.name) else: grouped_repos[repo.language] = [repo.name] # 4. create csv lines = [] for lang in sorted(grouped_repos.keys()): lines.append([lang] + grouped_repos[lang]) filename = '{media}/github-hot-repos-{date}.csv'.format(media=settings.MEDIA_ROOT, date=ref_date) return make_csv(filename, lines)

このタスクはcelery.canvas.groupを使用しますfetch_hot_repos/3の5つの同時呼び出しを実行します。これらの結果は待機されてから、リポジトリオブジェクトのリストにまとめられます。次に、結果セットがトピックごとにグループ化され、最終的にMEDIA_ROOT/の下で生成されたCSVファイルにエクスポートされます。ディレクトリ。

タスクを定期的にスケジュールするために、構成ファイルのスケジュールリストにエントリを追加することをお勧めします。

ファイル: config/local.py

from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'produce-csv-reports': { 'task': 'celery_uncovered.toyex.tasks.produce_hot_repo_report_task', 'schedule': crontab(minute=0, hour=0) # midnight, 'args': ('today',) }, }

試してみる

タスクがどのように機能しているかを起動してテストするには、最初にCeleryプロセスを開始する必要があります。

$ celery -A celery_uncovered worker -l info

次に、celery_uncovered/media/を作成する必要がありますディレクトリ。次に、ShellまたはCelerybeatのいずれかを介してその機能をテストできます。

c#不和ボットチュートリアル

シェル :

from datetime import date from celery_uncovered.toyex.tasks import produce_hot_repo_report_task produce_hot_repo_report_task.delay('today').get(timeout=5)

セレリービート :

# Start celerybeat with the following command $ celery -A celery_uncovered beat -l info

MEDIA_ROOT/で結果を見ることができますディレクトリ。

シナリオ2-レポート サーバー500 電子メールによるエラー

Celeryの最も一般的な使用例の1つは、電子メール通知の送信です。電子メール通知は、ローカルSMTPサーバーまたはサードパーティのSESのいずれかを利用するオフラインI / Oバウンド操作です。メールの送信を伴う多くのユースケースがあり、ほとんどの場合、ユーザーはこのプロセスが終了するまで待ってからHTTP応答を受信する必要はありません。そのため、このようなタスクをバックグラウンドで実行し、ユーザーにすぐに応答することが望ましいのです。

ユースケースの説明: Celeryを介して管理者の電子メールに50Xエラーを報告します。

PythonとDjangoには、実行するために必要な背景があります システムロギング 。 Pythonのロギングが実際にどのように機能するかについては詳しく説明しません。ただし、これまでに試したことがない場合や、復習が必要な場合は、組み込みのドキュメントをお読みください。 ロギング モジュール。実稼働環境では間違いなくこれが必要です。 Djangoには特別なロガーハンドラーがあります AdminEmailHandler 受信したログメッセージごとに管理者にメールを送信します。

実装の詳細

主なアイデアはsend_mailを拡張することですAdminEmailHandlerのメソッドセロリ経由でメールを送信できるような方法でクラス。これは、次の図に示すように実行できます。

CeleryとPythonを使用した管理者メールの処理
図3:CeleryとPythonを使用した管理者メールの処理

まず、report_error_taskというタスクを設定する必要がありますmail_adminsを呼び出します提供された件名とメッセージ:

ファイル: celery_uncovered/toyex/tasks.py

@shared_task def report_error_task(subject, message, *args, **kwargs): mail_admins(subject, message, *args, **kwargs)

次に、実際にAdminEmailHandlerを拡張して、定義されたCeleryタスクのみを内部的に呼び出すようにします。

ファイル: celery_uncovered/toyex/admin_email.py

ビジュアルコミュニケーションデザインとグラフィックデザイン
from django.utils.log import AdminEmailHandler from celery_uncovered.handlers.tasks import report_error_task class CeleryHandler(AdminEmailHandler): def send_mail(self, subject, message, *args, **kwargs): report_error_task.delay(subject, message, *args, **kwargs)

最後に、ロギングを設定する必要があります。 Djangoでのロギングの設定はかなり簡単です。必要なのはLOGGINGをオーバーライドすることですロギングエンジンが新しく定義されたハンドラーの使用を開始するように:

ファイル config/settings/local.py

LOGGING = { 'version': 1, 'disable_existing_loggers': False, ..., 'handlers': { ... 'mail_admins': { 'level': 'ERROR', 'filters': ['require_debug_true'], 'class': 'celery_uncovered.toyex.log_handlers.admin_email.CeleryHandler' } }, 'loggers': { 'django': { 'handlers': ['console', 'mail_admins'], 'level': 'INFO', }, ... } }

ハンドラーフィルターを意図的に設定していることに注意してくださいrequire_debug_trueアプリケーションがデバッグモードで実行されている間にこの機能をテストするため。

試してみる

それをテストするために、localhost:8000/report-errorで「ゼロ除算」操作を提供するDjangoビューを準備しました。また、MailHog Dockerコンテナを起動して、電子メールが実際に送信されることをテストする必要があります。

$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ CELERY_TASKSK_ALWAYS_EAGER=False python manage.py runserver $ # with your browser navigate to [http://localhost:8000](http://localhost:8000) $ # now check your outgoing emails by vising web UI [http://localhost:8025](http://localhost:8025)

追加の詳細

メールテストツールとして、MailHogをセットアップし、SMTP配信に使用するようにDjangoメーリングを構成しました。する方法はたくさんあります デプロイして実行 MailHog。 Dockerコンテナーを使用することにしました。詳細は、対応するREADMEファイルに記載されています。

ファイル: docker/mailhog/README.md

$ docker build . -f docker/mailhog/Dockerfile -t mailhog/mailhog:latest $ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ # navigate with your browser to localhost:8025

MailHogを使用するようにアプリケーションを構成するには、構成に次の行を追加する必要があります。

ファイル: config/settings/local.py

EMAIL_BACKEND = env('DJANGO_EMAIL_BACKEND', default='django.core.mail.backends.smtp.EmailBackend') EMAIL_PORT = 1025 EMAIL_HOST = env('EMAIL_HOST', default='mailhog')

デフォルトのセロリタスクを超えて

セロリタスクは、呼び出し可能な関数から作成できます。デフォルトでは、ユーザー定義のタスクにはcelery.app.task.Taskが挿入されます。親(抽象)クラスとして。このクラスには、タスクを非同期で実行する(ネットワーク経由でCeleryワーカーに渡す)または同期的に(テスト目的で)実行し、署名や他の多くのユーティリティを作成する機能が含まれています。次の例では、Celery.app.task.Taskを拡張しようとします。次に、それを基本クラスとして使用して、タスクにいくつかの便利な動作を追加します。

シナリオ3-タスクごとのファイルロギング

私のプロジェクトの1つでは、大量の階層データを取り込んでフィルタリングできる抽出、変換、読み込み(ETL)のようなツールをエンドユーザーに提供するアプリを開発していました。バックエンドは2つのモジュールに分割されました。

  • Celeryによるデータ処理パイプラインのオーケストレーション
  • Goによるデータ処理

Celeryは、1つのCelerybeatインスタンスと40を超えるワーカーでデプロイされました。パイプラインとオーケストレーションのアクティビティを構成する20以上の異なるタスクがありました。このような各タスクは、ある時点で失敗する可能性があります。これらの障害はすべて、各ワーカーのシステムログにダンプされました。ある時点で、Celeryレイヤーのデバッグと保守が不便になり始めました。最終的に、タスクログをタスク固有のファイルに分離することにしました。

ユースケースの説明: 各タスクが標準出力とエラーをファイルに記録するようにCeleryを拡張します

Celeryは、Pythonアプリケーションに内部での動作を細かく制御できるようにします。おなじみのシグナルフレームワークが付属しています。 Celeryを使用しているアプリケーションは、特定のアクションの動作を強化するために、それらのいくつかをサブスクライブできます。タスクレベルのシグナルを活用して、個々のタスクのライフサイクルを詳細に追跡します。セロリには常にロギングバックエンドが付属しており、目標を達成するためにいくつかの場所でわずかにオーバーライドするだけで、それを利用する予定です。

実装の詳細

Celeryはすでにタスクごとのロギングをサポートしています。ファイルに保存するには、ログ出力を適切な場所にディスパッチする必要があります。この場合、タスクの適切な場所は、タスクの名前と一致するファイルです。 Celeryインスタンスでは、組み込みのロギング構成を動的に推測されるロギングハンドラーでオーバーライドします。 celeryd_after_setupを購読することが可能です信号を送り、そこでシステムロギングを設定します。

ファイル: celery_uncovered/toyex/celery_conf.py

@signals.celeryd_after_setup.connect def configure_task_logging(instance=None, **kwargs): tasks = instance.app.tasks.keys() LOGS_DIR = settings.ROOT_DIR.path('logs') if not os.path.exists(str(LOGS_DIR)): os.makedirs(str(LOGS_DIR)) print 'dir created' default_handler = { 'level': 'DEBUG', 'filters': None, 'class': 'logging.FileHandler', 'filename': '' } default_logger = { 'handlers': [], 'level': 'DEBUG', 'propogate': True } LOG_CONFIG = { 'version': 1, # 'incremental': True, 'disable_existing_loggers': False, 'handlers': {}, 'loggers': {} } for task in tasks: task = str(task) if not task.startswith('celery_uncovered.'): continue task_handler = copy_dict(default_handler) task_handler['filename'] = str(LOGS_DIR.path(task + '.log')) task_logger = copy_dict(default_logger) task_logger['handlers'] = [task] LOG_CONFIG['handlers'][task] = task_handler LOG_CONFIG['loggers'][task] = task_logger logging.config.dictConfig(LOG_CONFIG)

Celeryアプリに登録されているタスクごとに、ハンドラーを使用して対応するロガーを構築していることに注意してください。各ハンドラーのタイプはlogging.FileHandlerであるため、そのような各インスタンスは入力としてファイル名を受け取ります。これを実行するために必要なのは、このモジュールをcelery_uncovered/celery.pyにインポートすることだけです。ファイルの最後に:

import celery_uncovered.tricks.celery_conf

get_task_logger(task_name)を呼び出すことにより、特定のタスクロガーを受信できます。タスクごとにこのような動作を一般化するには、celery.current_app.Taskをわずかに拡張する必要があります。いくつかのユーティリティメソッドを使用:

ファイル: celery_uncovered/tricks/celery_ext.py

class LoggingTask(current_app.Task): abstract = True ignore_result = False @property def logger(self): logger = get_task_logger(self.name) return logger def log_msg(self, msg, *msg_args): self.logger.debug(msg, *msg_args)

これで、task.log_msg('Hello, my name is: %s', task.request.id)が呼び出された場合、ログ出力はタスク名で対応するファイルにルーティングされます。

試してみる

このタスクがどのように機能するかを起動してテストするには、最初にCeleryプロセスを開始します。

$ celery -A celery_uncovered worker -l info

次に、シェルを介して機能をテストできるようになります。

from datetime import date from celery_uncovered.tricks.tasks import add add.delay(1, 3)

最後に、結果を確認するには、celery_uncovered/logsに移動します。ディレクトリを作成し、celery_uncovered.tricks.tasks.add.logという対応するログファイルを開きます。このタスクを複数回実行すると、次のようなものが表示される場合があります。

Result of 1 + 2 = 3 Result of 1 + 2 = 3 ...

シナリオ4-スコープ対応タスク

CeleryとDjango上に構築された海外ユーザー向けのPythonアプリケーションを想像してみましょう。ユーザーは、アプリケーションを使用する言語(ロケール)を設定できます。

多言語のロケール対応の電子メール通知システムを設計する必要があります。メール通知を送信するには、特定のキューによって処理される特別なCeleryタスクを登録しました。このタスクは、入力としていくつかの重要な引数と現在のユーザーロケールを受け取り、ユーザーが選択した言語でメールが送信されるようにします。

ここで、そのようなタスクが多数あると想像してください。ただし、これらの各タスクはロケール引数を受け入れます。この場合、より高いレベルの抽象化でそれを解決する方が良いのではないでしょうか。ここでは、その方法を説明します。

ユースケースの説明: 1つの実行コンテキストからスコープを自動的に継承し、パラメーターとして現在の実行コンテキストに挿入します。

実装の詳細

繰り返しますが、タスクロギングで行ったように、基本タスククラスを拡張したいと思いますcelery.current_app.Taskタスクの呼び出しを担当するいくつかのメソッドをオーバーライドします。このデモンストレーションの目的で、celery.current_app.Task::apply_asyncをオーバーライドします方法。このモジュールには、完全に機能する代替品を作成するのに役立つ追加のタスクがあります。

ファイル: celery_uncovered/tricks/celery_ext.py

class ScopeBasedTask(current_app.Task): abstract = True ignore_result = False default_locale_id = DEFAULT_LOCALE_ID scope_args = ('locale_id',) def __init__(self, *args, **kwargs): super(ScopeBasedTask, self).__init__(*args, **kwargs) self.set_locale(locale=kwargs.get('locale_id', None)) def set_locale(self, scenario_id=None): self.locale_id = self.default_locale_id if locale_id: self.locale_id = locale_id else: self.locale_id = get_current_locale().id def apply_async(self, args=None, kwargs=None, **other_kwargs): self.inject_scope_args(kwargs) return super(ScopeBasedTask, self).apply_async(args=args, kwargs=kwargs, **other_kwargs) def __call__(self, *args, **kwargs): task_rv = super(ScopeBasedTask, self).__call__(*args, **kwargs) return task_rv def inject_scope_args(self, kwargs): for arg in self.scope_args: if arg not in kwargs: kwargs[arg] = getattr(self, arg)

重要な手がかりは、デフォルトで現在のロケールをKey-Value引数として呼び出し元のタスクに渡すことです。タスクが特定のロケールを引数として呼び出された場合、そのタスクは変更されません。

試してみる

この機能をテストするために、タイプScopeBasedTaskのダミータスクを定義しましょう。ロケールIDでファイルを検索し、そのコンテンツをJSONとして読み取ります。

ユーザビリティテストの実施方法

ファイル: celery_uncovered/tricks/tasks.py

@shared_task(bind=True, base=ScopeBasedTask) def read_scenario_file_task(self, **kwargs): fixture_parts = ['locales', 'sc_%i.json' % kwargs['scenario_id']] return read_fixture(*fixture_parts)

ここで行う必要があるのは、Celeryを起動し、シェルを起動し、さまざまなシナリオでこのタスクの実行をテストする手順を繰り返すことです。備品はcelery_uncovered/tricks/fixtures/locales/の下にありますディレクトリ。

結論

この投稿は、さまざまな視点からセロリを探索することを目的としています。セロリは、郵送やレポートの生成などの従来の例と、いくつかの興味深いニッチなビジネスユースケースへの共有トリックで説明しました。 Celeryはデータ駆動型の哲学に基づいて構築されており、チームはシステムスタックの一部としてCeleryを導入することで、生活をはるかにシンプルにすることができます。基本的なPythonの経験があれば、Celeryベースのサービスの開発はそれほど複雑ではなく、かなり迅速に習得できるはずです。デフォルトの構成はほとんどの用途に十分ですが、必要に応じて非常に柔軟にできます。

私たちのチームは、バックグラウンドジョブと長時間実行タスクのオーケストレーションバックエンドとしてCeleryを使用することを選択しました。さまざまなユースケースで幅広く使用していますが、この投稿ではその一部についてのみ言及しました。私たちは毎日ギガバイトのデータを取り込んで分析していますが、これは水平スケーリング技術の始まりにすぎません。

基本を理解する

Celery for Pythonとは何ですか?

セロリは、Pythonの世界で最も人気のあるバックグラウンドジョブマネージャーの1人です。 Celeryは、RabbitMQやRedisなどのいくつかのメッセージブローカーと互換性があり、プロデューサーとコンシューマーの両方として機能できます。

Pub-Subとは何ですか?

パブリッシュ/サブスクライブ(またはプロデューサー-コンシューマー)パターンは、パブリッシャーがメッセージブローカーを介してメッセージをブロードキャストし、サブスクライバーがメッセージをリッスンするコンピューターシステムの分散メッセージングパターンです。どちらもシステムの分離されたコンポーネントであり、他のコンポーネントを認識したり、直接通信したりすることはできません。

ブートストラップを使用して.NETプロジェクトを作成する方法

バックエンド

ブートストラップを使用して.NETプロジェクトを作成する方法
そのスリルに見えます-急成長している美容業界の内部

そのスリルに見えます-急成長している美容業界の内部

収益と成長

人気の投稿
ネットプロモータースコアが十分ではありません:ユーザー調査が必要です
ネットプロモータースコアが十分ではありません:ユーザー調査が必要です
シニアクライアントパートナー、ヘルスケア&ライフサイエンス
シニアクライアントパートナー、ヘルスケア&ライフサイエンス
Gulp:サイトの速度を最大化するためのWeb開発者の秘密兵器
Gulp:サイトの速度を最大化するためのWeb開発者の秘密兵器
JSONとXMLの詳細、パート1:各標準の歴史
JSONとXMLの詳細、パート1:各標準の歴史
Aho-Corasickアルゴリズムで文字列検索を征服する
Aho-Corasickアルゴリズムで文字列検索を征服する
 
アプリを収益性の高いものにする、パート2 –モバイルファネルの活用
アプリを収益性の高いものにする、パート2 –モバイルファネルの活用
オープンソースソフトウェア-ビジネスモデルは収益性があるかどうか?
オープンソースソフトウェア-ビジネスモデルは収益性があるかどうか?
製品設計における人間中心の設計の重要性
製品設計における人間中心の設計の重要性
トッププロジェクトマネージャーの5つの不可欠な資質
トッププロジェクトマネージャーの5つの不可欠な資質
ソーシャルネットワークAPI:インターネットの実世界へのポータル
ソーシャルネットワークAPI:インターネットの実世界へのポータル
人気の投稿
  • AWS認定を行う方法
  • C ++またはCを学ぶ
  • Rubyはどの言語で書かれていますか
  • クレジットカードの暗証番号を解読する方法
  • クレジットカード番号を解読する方法
  • phpmysql出力を個別の配列に
  • javascriptは時間なしで日付を取得します
カテゴリー
その他 ブランドデザイン モバイルデザイン アジャイル 人とチーム Uxデザイン 収益性と効率性 Kpiと分析 Uiデザイン ライフスタイル

© 2021 | 全著作権所有

apeescape2.com