このブログ投稿は、Clouderaとの統合前にHortonworks.comで公開されていました。一部のリンク、リソース、または参照は、正確でなくなる可能性があります。
2016年に、Spark HBase Connector(SHC)の2番目のバージョンv1.0.1を公開しました。このブログでは、今年実装した主な機能について説明します。
フェニックスコーダーをサポート
SHCを使用して、データをHBaseクラスターに書き込み、さらにダウンストリーム処理することができます。入力および出力データのAvroシリアル化をサポートし、デフォルトでは、単純なネイティブエンコーディングメカニズムを使用したカスタムシリアル化になります。入力データを読み取るとき、SHCはデータの効率的なスキャンのためにフィルターをHBaseにプッシュダウンします。 HBaseでのPhoenixデータの人気を考えると、Avroデータに加えてHBaseへの入力としてPhoenixデータをサポートするのは自然なことのようです。また、デフォルトで単純なネイティブバイナリエンコーディングを使用すると、将来の変更の影響を受けやすくなり、SHCからHBaseにデータを書き込むユーザーにとってリスクがあります。たとえば、SHCが進むにつれ、下位互換性を適切に処理する必要があります。したがって、デフォルトのSHCは、Phoenixのようなより標準的で十分にテストされた形式に変更する必要があります。
複合キーのサポートでは、この機能の前は、複合キーの最後のディメンションを除いて、各ディメンションの値の長さを固定する必要がありました。この制限は、Phoenixコーダーによって削除されました。現在、ユーザーがデータコーダーとしてPhoenixを選択した場合、カタログ内の複合キーの各部分の長さを指定する必要はありません。
フェニックスはデフォルトのコーダーであるため、ユーザーにとっての唯一の変更は、データコーダーとしてPrimitiveTypeを使用する場合、カタログで「tableCoder」:「PrimitiveType」を指定して、代わりにPrimitiveTypeを使用することをSHCに通知する必要があることです。フェニックスの「tableCoder」として。
def catalog =s””” {
|” table”:{“ namespace”:” default”、“ name”:” table1”、“ tableCoder”:” PrimitiveType”}、
|” rowkey ”:” key”、
|” columns”:{
|” col0”:{“ cf”:” rowkey”、“ col”:” key”、“ type”:” string”} 、
|” col1”:{“ cf”:” cf1”、“ col”:” col1”、“ type”:” boolean”}、
|” col2”:{“ cf”: ” cf2”、“ col”:” col2”、“ type”:” double”}、
|” col3”:{“ cf”:” cf3”、“ col”:” col3”、“ type” :” float”}、
|” col4”:{“ cf”:” cf4”、“ col”:” col4”、“ type”:” int”}、
|” col5”: {“ cf”:” cf5”、“ col”:” col5”、“ type”:” bigint”}、
|” col6”:{“ cf”:” cf6”、“ col”:” col6 ″、“ type”:” smallint”}、
|” col7”:{“ cf”:” cf7”、“ col”:” col7”、“ type”:” string”}、
|” col8”:{“ cf”:” cf8”、“ col”:” col8”、“ type”:” tinyint”}
|}
|}”””。stripMargin
キャッシュSparkHBase接続
SHCは、以前は接続オブジェクトをHBaseにキャッシュしていませんでした。具体的には、SHCがHBaseテーブルとリージョンにアクセスする必要があるたびに、「ConnectionFactory.createConnection」の呼び出しが行われました。ユーザーは、エグゼキュータログを確認し、リクエストごとに確立されているzookeeper接続を確認するだけで、これを確認できます。インターフェイスConnectionのドキュメントでは、接続の作成は重い操作であり、接続の実装はスレッドセーフであると記載されています。したがって、長期間有効なプロセスの場合、SHCが接続をキャッシュしておくと非常に便利です。この機能により、SHCは作成される接続の数を大幅に減らし、プロセスでのパフォーマンスを大幅に向上させます。
重複する列ファミリをサポート
SHCは、重複列ファミリーのサポートをサポートしています。これで、ユーザーは次のようにカタログを定義できます。
def catalog =s””” {
|” table”:{“ namespace”:” default”、“ name”:” table1”、“ tableCoder”:” PrimitiveType”}、
|” rowkey ”:” key”、
|” columns”:{
|” col0”:{“ cf”:” rowkey”、“ col”:” key”、“ type”:” string”} 、
|” col1”:{“ cf”:” cf1”、“ col”:” col1”、“ type”:” boolean”}、
|” col2”:{“ cf”: ” cf1”、“ col”:” col2”、“ type”:” double”}、
|” col3”:{“ cf”:” cf1”、“ col”:” col3”、“ type” :” float”}、
|” col4”:{“ cf”:” cf2”、“ col”:” col4”、“ type”:” int”}、
|” col5”: {“ cf”:” cf2”、“ col”:” col5”、“ type”:” bigint”}、
|” col6”:{“ cf”:” cf3”、“ col”:” col6 ″、“ type”:” smallint”}、
|” col7”:{“ cf”:” cf3”、“ col”:” col7”、“ type”:” string”}、
|” col8”:{“ cf”:” cf3”、“ col”:” col8”、“ type”:” tinyint”}
|}
|}”””。stripMargin
上記のカタログ定義では、列「col0」、「col1」、および「col2」は同じ列ファミリー「cf1」を持っています。
SparkUnhandledFiltersAPIを使用する
SHCは、効果的な最適化であるSparkAPIunhandledFiltersも実装しています。このAPIは、すべてのフィルターを返すのではなく、SHCが実装していないフィルターについてSparkに通知します。この場合の以前の動作は、データがSparkにプルされたら、すべてのフィルターを再適用することでした。これはべき等である必要があるため、データは変更されませんが、フィルターが複雑な場合はコストがかかる可能性があります。
SHCコミュニティ
SHCコミュニティは、1年前よりも大きく、影響力があります。 2016年には、HadoopサミットとHBase / Sparkミートアップで講演を行い、詳細なブログを作成しました。 SHCユーザーの数が増えるにつれ、ユーザーからの質問も増えています。 SHCの採用が増えていることを大変うれしく思います。さらに改善する方法についてご意見がございましたら、HortonworksCommunityConnectionからフィードバックをお寄せください。
謝辞
ブルームバーグチームがこの作業を指導し、この作業の検証を支援してくれたことに感謝します。また、フィードバックを提供し、これを改善してくれたHBaseコミュニティにも感謝します。最後に、この作業は以前のSpark HBase統合からの教訓を活用しており、開発者の道を開いてくれたことに感謝します。
参照:
SHC:https://github.com/hortonworks-spark/shc
Apache HBase:https://hbase.apache.org/
Apache Spark:http://spark.apache.org/
Apache Phoenix:https://phoenix.apache.org/