Apache Kafkaでは、プロデューサーと呼ばれるJavaアプリケーションが構造化メッセージをKafkaクラスター(ブローカーで構成)に書き込みます。同様に、コンシューマーと呼ばれるJavaアプリケーションは、同じクラスターからこれらのメッセージを読み取ります。一部の組織では、プロデューサーとコンシューマーの作成と管理を担当するさまざまなグループがあります。このような場合、1つの大きな問題点は、プロデューサーとコンシューマーの間で合意されたメッセージ形式の調整にある可能性があります。
この例は、Apache Avroを使用して、スキーマの進化とプロデューサーおよびコンシューマーアプリケーションの非同期更新を可能にしながら、ApacheKafkaに生成されたレコードをシリアル化する方法を示しています。
シリアル化と逆シリアル化
Kafkaレコード(以前はメッセージと呼ばれていました)は、キー、値、およびヘッダーで構成されています。 Kafkaは、レコードのキーと値のデータの構造を認識していません。それらをバイト配列として処理します。しかし、Kafkaからレコードを読み取るシステムは、それらのレコードのデータを考慮します。したがって、読み取り可能な形式でデータを生成する必要があります。使用するデータ形式は
- コンパクトにする
- エンコードとデコードを高速化
- 進化を許可する
- アップストリームシステム(Kafkaクラスターに書き込むシステム)とダウンストリームシステム(同じKafkaクラスターから読み取るシステム)が異なる時間に新しいスキーマにアップグレードできるようにします
たとえば、JSONは自明ですが、コンパクトなデータ形式ではなく、解析に時間がかかります。 Avroは、比較的コンパクトな出力を作成する高速シリアル化フレームワークです。ただし、Avroレコードを読み取るには、データがシリアル化されたスキーマが必要です。
1つのオプションは、レコード自体とともにスキーマを保存および転送することです。これは、スキーマを一度保存して多数のレコードに使用するファイルでは問題ありません。ただし、すべてのKafkaレコードにスキーマを保存すると、ストレージスペースとネットワーク使用率の点でかなりのオーバーヘッドが追加されます。もう1つのオプションは、合意された一連の識別子とスキーマのマッピングを用意し、レコード内の識別子によってスキーマを参照することです。
オブジェクトからKafkaレコードへそして戻る
プロデューサーアプリケーションは、データを直接バイト配列に変換する必要はありません。 KafkaProducerは、ユーザーがキーと値の型を指定する必要がある汎用クラスです。次に、プロデューサーはProducerRecord
のインスタンスを受け入れます 同じタイプのパラメータを持っています。オブジェクトからバイト配列への変換は、シリアライザーによって行われます。 Kafkaは、いくつかのプリミティブシリアライザーを提供します。たとえば、IntegerSerializer
、ByteArraySerializer
、StringSerializer
。コンシューマー側では、同様のデシリアライザーがバイト配列をアプリケーションが処理できるオブジェクトに変換します。
したがって、シリアライザーおよびデシリアライザーレベルで接続し、プロデューサーおよびコンシューマーアプリケーションの開発者がKafkaによって提供される便利なインターフェイスを使用できるようにすることは理にかなっています。 Kafkaの最新バージョンでは、ExtendedSerializers
が許可されていますが およびExtendedDeserializers
ヘッダーにアクセスするために、レコードヘッダーを追加する代わりに、Kafkaレコードのキーと値にスキーマ識別子を含めることにしました。
Avro Essentials
Avroは、データシリアル化(およびリモートプロシージャコール)フレームワークです。スキーマと呼ばれるJSONドキュメントを使用してデータ構造を記述します。ほとんどのAvroの使用は、GenericRecordまたはSpecificRecordのサブクラスのいずれかを介して行われます。 Avroスキーマから生成されたJavaクラスは後者のサブクラスですが、前者は、使用されるデータ構造の事前知識がなくても使用できます。
2つのスキーマが一連の互換性ルールを満たす場合、一方のスキーマ(ライタースキーマと呼ばれる)で書き込まれたデータは、もう一方のスキーマ(リーダースキーマと呼ばれる)で書き込まれたかのように読み取ることができます。スキーマには、同等性チェックを支援するために削除されたコメントなど、シリアル化に関係のないすべての詳細が含まれる標準形があります。
VersionedSchemaとSchemaProvider
前述のように、スキーマとその識別子の間には1対1のマッピングが必要です。スキーマを名前で参照する方が簡単な場合があります。互換性のあるスキーマが作成されると、それはスキーマの次のバージョンと見なすことができます。したがって、名前とバージョンのペアでスキーマを参照できます。スキーマ、その識別子、名前、バージョンをまとめてVersionedSchema
と呼びましょう。 。このオブジェクトは、アプリケーションが必要とする追加のメタデータを保持している可能性があります。
public class VersionedSchema { private final int id; private final String name; private final int version; private final Schema schema; public VersionedSchema(int id, String name, int version, Schema schema) { this.id = id; this.name = name; this.version = version; this.schema = schema; } public String getName() { return name; } public int getVersion() { return version; } public Schema getSchema() { return schema; } public int getId() { return id; } }
SchemaProvider
オブジェクトはVersionedSchema
のインスタンスを検索できます 。
public interface SchemaProvider extends AutoCloseable { public VersionedSchema get(int id); public VersionedSchema get(String schemaName, int schemaVersion); public VersionedSchema getMetadata(Schema schema); }
このインターフェイスの実装方法については、今後のブログ投稿の「スキーマストアの実装」で説明します。
一般的なデータのシリアル化
レコードをシリアル化するときは、最初に使用するスキーマを把握する必要があります。各レコードにはgetSchema
があります 方法。ただし、スキーマから識別子を見つけるには時間がかかる場合があります。一般に、初期化時にスキーマを設定する方が効率的です。これは、識別子または名前とバージョンによって直接実行できます。さらに、複数のトピックを作成する場合は、トピックごとに異なるスキーマを設定し、パラメーターとして指定されたトピック名からメソッドserialize(T, String)
にスキーマを見つけたい場合があります。 。このロジックは、簡潔さと単純さのために、この例では省略されています。
private VersionedSchema getSchema(T data, String topic) { return schemaProvider.getMetadata( data.getSchema()); }
スキーマが手元にあるので、それをメッセージに保存する必要があります。メッセージの一部としてIDをシリアル化すると、すべての魔法がシリアライザー/デシリアライザーで発生するため、コンパクトなソリューションが得られます。また、すでにKafkaをサポートしている他のフレームワークやライブラリとの統合が非常に簡単になり、ユーザーは独自のシリアライザー(Sparkなど)を使用できるようになります。
このアプローチを使用して、最初に最初の4バイトにスキーマ識別子を書き込みます。
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException { try (DataOutputStream os = new DataOutputStream(stream)) { os.writeInt(id); } }
次に、DatumWriter
を作成できます オブジェクトをシリアル化します。
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema); datumWriter.write(data, encoder); encoder.flush(); }
これをすべてまとめて、汎用データシリアライザーを実装しました。
public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> { private SchemaProvider schemaProvider; @Override public void configure(Map<String, ?> configs, boolean isKey) { schemaProvider = SchemaUtils.getSchemaProvider(configs); } @Override public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { VersionedSchema schema = getSchema(data, topic); writeSchemaId(stream, schema.getId()); writeSerializedAvro(stream, data, schema.getSchema()); return stream.toByteArray(); } catch (IOException e) { throw new RuntimeException("Could not serialize data", e); } } private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...} private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...} private VersionedSchema getSchema(T data, String topic) {...} @Override public void close() { try { schemaProvider.close(); } catch (Exception e) { throw new RuntimeException(e); } } }
一般的なデータの逆シリアル化
デシリアライズは単一のスキーマ(スキーマデータが書き込まれた)で機能しますが、別のリーダースキーマを指定することもできます。リーダースキーマは、データがシリアル化されたスキーマと互換性がある必要がありますが、同等である必要はありません。このため、スキーマ名を導入しました。これで、特定のバージョンのスキーマでデータを読み取るように指定できます。初期化時に、スキーマ名ごとに必要なスキーマバージョンを読み取り、メタデータをreaderSchemasByName
に保存します。 すばやくアクセスできます。これで、互換性のあるバージョンのスキーマで書き込まれたすべてのレコードを、指定されたバージョンで書き込まれたかのように読み取ることができます。
@Override public void configure(Map<String, ?> configs, boolean isKey) { this.schemaProvider = SchemaUtils.getSchemaProvider(configs); this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider); }
レコードを逆シリアル化する必要がある場合、最初にライタースキーマの識別子を読み取ります。これにより、リーダースキーマを名前で検索できます。両方のスキーマが利用可能であるため、GeneralDatumReader
を作成できます。 記録を読んでください。
@Override public GenericData.Record deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); VersionedSchema readerSchema = readerSchemasByName.get(writerSchema.getName()); GenericData.Record avroRecord = readAvroRecord(stream, writerSchema.getSchema(), readerSchema.getSchema()); return avroRecord; } catch (IOException e) { throw new RuntimeException(e); } } private int readSchemaId(InputStream stream ) throws IOException { try(DataInputStream is = new DataInputStream(stream)) { return is.readInt(); } } private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); GenericData.Record record = new GenericData.Record(readerSchema); datumReader.read(record, decoder); return record; }
SpecificRecordsの処理
多くの場合、レコードに使用したいクラスが1つあります。このクラスは通常、Avroスキーマから生成されます。 Apache Avroは、スキーマからJavaコードを生成するためのツールを提供します。そのようなツールの1つがAvroMavenプラグインです。生成されたクラスには、実行時に使用可能に生成されたスキーマがあります。これにより、シリアル化と逆シリアル化がより簡単かつ効果的になります。シリアル化の場合、クラスを使用して、使用するスキーマ識別子を確認できます。
@Override public void configure(Map<String, ?> configs, boolean isKey) { String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString(); try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) { Class<?> recordClass = Class.forName(className); Schema writerSchema = new SpecificData(recordClass.getClassLoader()).getSchema(recordClass); this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId(); } catch (Exception e) { throw new RuntimeException(e); } }
したがって、トピックとデータからスキーマを決定するロジックは必要ありません。レコードクラスで使用可能なスキーマを使用してレコードを書き込みます。
同様に、逆シリアル化の場合、リーダースキーマはクラス自体から見つけることができます。リーダースキーマは構成時に固定され、スキーマ名で検索する必要がないため、逆シリアル化ロジックが単純になります。
@Override public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); return readAvroRecord(stream, writerSchema.getSchema(), readerSchema); } catch (IOException e) { throw new RuntimeException(e); } } private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); return datumReader.read(null, decoder); }を返します
追加の読み物
スキーマの互換性の詳細については、スキーマ解決に関するAvro仕様を参照してください。
標準形の詳細については、スキーマの標準形の解析に関するAvro仕様を参照してください。
次回…
パート2では、Avroスキーマ定義を格納するシステムの実装を示します。