編集2018-01-27:
この問題はDirectRunnerに関連していることがわかりました。 DataflowRunnerを使用して同じパイプラインを実行する場合、実際には最大1,000レコードのバッチを取得する必要があります。 DirectRunnerは、グループ化操作の後に常にサイズ1のバンドルを作成します。
元の回答:
Apache BeamのJdbcIOを使用してクラウドデータベースに書き込むときに、同じ問題が発生しました。問題は、JdbcIOが1つのバッチで最大1,000レコードの書き込みをサポートしているのに対し、実際に一度に1行を超える書き込みを行ったことがないことです(これは常に開発環境でDirectRunnerを使用していました)。
そのため、JdbcIOに機能を追加しました。この機能では、データをグループ化し、各グループを1つのバッチとして書き込むことで、バッチのサイズを自分で制御できます。以下は、Apache Beamの元のWordCountの例に基づいて、この機能を使用する方法の例です。
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
JdbcIOの通常の書き込みメソッドとの違いは、新しいメソッドwriteIterable()
です。 PCollection<Iterable<RowT>>
が必要です PCollection<RowT>
の代わりに入力として 。各Iterableは、1つのバッチとしてデータベースに書き込まれます。
この追加が追加されたJdbcIOのバージョンは、次の場所にあります。 https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
上記の例を含むプロジェクト全体は、次の場所にあります: https://github.com/ olavloite / spanner-beam-example
(これをプロジェクトに含めるために、Apache Beamで保留中のプルリクエストもあります)