sql >> データベース >  >> RDS >> Mysql

Airflowを使用してmysqlレコードを取得して処理する方法は?

    私は過去90分間、これに本当に苦労していました。これは、新規参入者をフォローするためのより宣言的な方法です:

    from airflow.hooks.mysql_hook import MySqlHook
    
    def fetch_records():
      request = "SELECT * FROM your_table"
      mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
      connection = mysql_hook.get_conn()
      cursor = connection.cursor()
      cursor.execute(request)
      sources = cursor.fetchall()
      print(sources)
    
    ...your DAG() as dag: code
    
    task = PythonOperator(
      task_id = 'fetch_records',
      python_callable = fetch_records
    )
    

    これにより、DBクエリの内容がログに戻ります。

    これが他の誰かに役立つことを願っています。



    1. pySparkを使用してDataFrameをmysqlテーブルに書き込みます

    2. 単一の大きなクエリといくつかの小さなクエリのどちらが高速ですか?

    3. mysqldumpからの出力をより小さなファイルに分割するにはどうすればよいですか?

    4. MySQL Workbench 5.2を使用してテーブルでupdateコマンドを実行中にエラー(エラーコード:1175)