sql >> データベース >  >> RDS >> Mysql

Google Dataflow(Apache Beam)JdbcIOをmysqlデータベースに一括挿入

    編集2018-01-27:

    この問題はDirectRunnerに関連していることがわかりました。 DataflowRunnerを使用して同じパイプラインを実行する場合、実際には最大1,000レコードのバッチを取得する必要があります。 DirectRunnerは、グループ化操作の後に常にサイズ1のバンドルを作成します。

    元の回答:

    Apache BeamのJdbcIOを使用してクラウドデータベースに書き込むときに、同じ問題が発生しました。問題は、JdbcIOが1つのバッチで最大1,000レコードの書き込みをサポートしているのに対し、実際に一度に1行を超える書き込みを行ったことがないことです(これは常に開発環境でDirectRunnerを使用していました)。

    そのため、JdbcIOに機能を追加しました。この機能では、データをグループ化し、各グループを1つのバッチとして書き込むことで、バッチのサイズを自分で制御できます。以下は、Apache Beamの元のWordCountの例に基づいて、この機能を使用する方法の例です。

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
        // Count words in input file(s)
        .apply(new CountWords())
        // Format as text
        .apply(MapElements.via(new FormatAsTextFn()))
        // Make key-value pairs with the first letter as the key
        .apply(ParDo.of(new FirstLetterAsKey()))
        // Group the words by first letter
        .apply(GroupByKey.<String, String> create())
        // Get a PCollection of only the values, discarding the keys
        .apply(ParDo.of(new GetValues()))
        // Write the words to the database
        .apply(JdbcIO.<String> writeIterable()
                .withDataSourceConfiguration(
                    JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
                .withStatement(INSERT_OR_UPDATE_SQL)
                .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
    

    JdbcIOの通常の書き込みメソッドとの違いは、新しいメソッドwriteIterable()です。 PCollection<Iterable<RowT>>が必要です PCollection<RowT>の代わりに入力として 。各Iterableは、1つのバッチとしてデータベースに書き込まれます。

    この追加が追加されたJdbcIOのバージョンは、次の場所にあります。 https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

    上記の例を含むプロジェクト全体は、次の場所にあります: https://github.com/ olavloite / spanner-beam-example

    (これをプロジェクトに含めるために、Apache Beamで保留中のプルリクエストもあります)




    1. PowerBIDesktopビジュアライゼーションでのデータのフォーマット

    2. MySQLレコードのリストで次のアイテムを取得するにはどうすればよいですか?

    3. Node.jsでのPostgreSQLの複数行の更新

    4. SQLLIKE句でのSqlParameterの使用が機能しない