sql >> データベース >  >> NoSQL >> HBase

Spark-on-HBase:DataFrameベースのHBaseコネクター

    このブログ投稿は、Clouderaとの統合前にHortonworks.comで公開されていました。一部のリンク、リソース、または参照は、正確でなくなる可能性があります。

    HortonworksがBloombergと協力して開発したSpark-HBaseConnectorのテクニカルプレビューを発表できることを誇りに思います。

    Spark-HBaseコネクターは、Spark-1.2.0で導入されたデータソースAPI(SPARK-3247)を利用します。単純なHBaseKeyValueストアと複雑なリレーショナルSQLクエリの間のギャップを埋め、ユーザーがSparkを使用してHBase上で複雑なデータ分析を実行できるようにします。 HBaseDataFrameは標準のSparkDataFrameであり、Hive、ORC、Parquet、JSONなどの他のデータソースと対話できます。

    背景

    Sparkパッケージ、独立したプロジェクト、またはHBaseトランクのいずれかで利用できるオープンソースのSparkHBaseコネクタがいくつかあります。

    Sparkは、組み込みのクエリプラン最適化を提供するDataset /DataFrameAPIに移行しました。現在、エンドユーザーはDataFrames/Datasetsベースのインターフェースを使用することを好みます。

    HBaseトランクのHBaseコネクタは、RDDレベルで豊富なサポートを提供します。 BulkPutなどですが、そのDataFrameサポートはそれほど豊富ではありません。 HBaseトランクコネクタは、HBaseが組み込まれたTableInputFormatを備えた標準のHadoopRDDに依存しており、パフォーマンスにいくつかの制限があります。さらに、ドライバーで実行されるBulkGetは、単一障害点である可能性があります。

    他にもいくつかの代替実装があります。 Spark-SQL-on-HBaseを利用する 例として。標準のSparkCatalystエンジン内に独自のクエリ最適化プランを組み込むことにより、非常に高度なカスタム最適化手法を適用し、RDDをHBaseに出荷し、HBaseコプロセッサー内で部分的な集約などの複雑なタスクを実行します。このアプローチは高いパフォーマンスを実現できますが、その複雑さとSparkの急速な進化のために、維持するのは困難です。また、コプロセッサ内で任意のコードを実行できるようにすると、セキュリティ上のリスクが生じる可能性があります。

    Spark-on-HBaseコネクタ(SHC)は、これらの潜在的なボトルネックと弱点を克服するために開発されました。標準のSparkDatasourceAPIを実装し、SparkCatalystエンジンを利用してクエリを最適化します。並行して、RDDは、 TableInputFormat を使用する代わりに、ゼロから構築されます。 高性能を実現するために。このカスタマイズされたRDDを使用すると、パーティションのプルーニング、列のプルーニング、述語のプッシュダウン、データのローカリティなど、すべての重要な手法を適用して完全に実装できます。この設計により、パフォーマンスとシンプルさの間の適切なトレードオフを実現しながら、メンテナンスが非常に簡単になります。

    アーキテクチャ

    次の図に示すように、SparkとHBaseが同じクラスターにデプロイされ、Sparkエグゼキューターがリージョンサーバーと同じ場所に配置されていると想定しています。

    図1.Spark-on-HBaseコネクタアーキテクチャ

    大まかに言えば、コネクタはスキャンと取得の両方を同様の方法で処理し、両方のアクションがエグゼキュータで実行されます。ドライバーはクエリを処理し、リージョンのメタデータに基づいてスキャン/取得を集約し、リージョンごとにタスクを生成します。タスクは、リージョンサーバーと同じ場所に配置された優先エグゼキューターに送信され、エグゼキューター内で並行して実行されて、データの局所性と同時実行性が向上します。リージョンが必要なデータを保持していない場合、そのリージョンサーバーにはタスクが割り当てられません。タスクは複数のスキャンとBulkGetで構成されている場合があり、タスクによるデータ要求は1つのリージョンサーバーからのみ取得されます。このリージョンサーバーは、タスクのローカリティ設定にもなります。ドライバーは、タスクのスケジューリングを除いて、実際のジョブの実行には関与しないことに注意してください。これにより、ドライバーがボトルネックになるのを防ぎます。

    テーブルカタログ

    HBaseテーブルをリレーショナルテーブルとしてSparkに取り込むために、テーブルカタログと呼ばれるHBaseテーブルとSparkテーブルの間のマッピングを定義します。このカタログには2つの重要な部分があります。 1つは行キーの定義で、もう1つはSparkのテーブル列とH​​Baseの列ファミリーおよび列修飾子の間のマッピングです。詳細については、使用法のセクションを参照してください。

    ネイティブAvroサポート

    構造化データをバイト配列としてHBaseに永続化することは非常に一般的な方法であるため、コネクターはAvro形式をネイティブにサポートします。ユーザーはAvroレコードをHBaseに直接永続化できます。内部的には、AvroスキーマはネイティブのSparkCatalystデータ型に自動的に変換されます。 HBaseテーブルの両方のキー値部分はAvro形式で定義できることに注意してください。正確な使用法については、リポジトリの例/テストケースを参照してください。

    述語プッシュダウン

    コネクタは、ネットワークオーバーヘッドを削減し、Spark Catalystエンジンでの冗長な処理を回避するために、リージョンサーバーから必要な列のみを取得します。既存の標準HBaseフィルターは、コプロセッサー機能を利用せずに述語プッシュダウンを実行するために使用されます。 HBaseは、バイト配列以外のデータ型、およびJavaプリミティブ型とバイト配列間の順序の不一致を認識しないため、データの損失を回避するために、スキャン操作でフィルターを設定する前にフィルター条件を前処理する必要があります。リージョンサーバー内では、クエリ条件に一致しないレコードが除外されます。

    パーティションの剪定

    述語から行キーを抽出することにより、Scan / BulkGetを複数の重複しない範囲に分割し、要求されたデータを持つリージョンサーバーのみがScan/BulkGetを実行します。現在、パーティションのプルーニングは、行キーの最初の次元で実行されます。たとえば、行キーが「key1:key2:key3」の場合、パーティションのプルーニングは「key1」のみに基づいて行われます。 WHERE条件は注意深く定義する必要があることに注意してください。そうしないと、パーティションのプルーニングが有効にならない場合があります。たとえば、WHERE rowkey1>“ abc” OR column =“ xyz”(rowkey1はrowkeyの最初の次元であり、columnは通常のhbase列です)は、すべての範囲をカバーする必要があるため、フルスキャンになります。 またはの ロジック。

    データの局所性

    SparkエグゼキューターがHBaseリージョンサーバーと同じ場所に配置されている場合、データのローカリティはリージョンサーバーの場所を特定することで実現され、タスクをリージョンサーバーと同じ場所に配置するために最善を尽くします。各エグゼキュータは、同じホスト上に同じ場所に配置されたデータの一部に対してScan/BulkGetを実行します。

    スキャンとBulkGet

    これらの2つの演算子は、WHERE CLAUSEを指定することでユーザーに公開されます(例: WHEREcolumn>xおよびcolumn) スキャンおよびWHEREcolumn =xの場合 忘れる。操作はエグゼキュータで実行され、ドライバはこれらの操作のみを構築します。内部的にはスキャンや取得に変換され、Iterator[Row]は上位層の処理のために触媒エンジンに返されます。

    使用法

    以下に、コネクタの基本的な使用方法を示します。 Avroや複合キーのサポートなどの詳細と高度なユースケースについては、リポジトリの例を参照してください。

    1)スキーママッピングのカタログを定義します:

    [code language="scala"]def catalog = s"""{
             |"table":{"namespace":"default", "name":"table1"},
             |"rowkey":"key",
             |"columns":{
               |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
               |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
               |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
               |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
               |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
               |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
               |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
               |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
               |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
             |}
           |}""".stripMargin
    [/code]

    2)データを準備し、HBaseテーブルにデータを入力します:
    case class HBaseRecord(col0:String、col1:Boolean、col2:Double、col3:Float、col4:Int、col5:Long、col6:Short、col7:String、col8:Byte)

    object HBaseRecord {def apply(i:Int、t:String):HBaseRecord ={val s =s””” row $ {“%03d” .format(i)}””” HBaseRecord(s、i%2 ==0、i.toDouble、i.toFloat、i、i.toLong、i.toShort、s” String $ i:$ t”、i.toByte)}}

    val data =(0〜255).map {i => HBaseRecord(i、“ extra”)}

    sc.parallelize(data).toDF.write.options(
    Map(HBaseTableCatalog.tableCatalog-> catalog、HBaseTableCatalog.newTable->“ 5”))
    .format(“org.apache.spark。 sql.execution.datasources.hbase”)
    .save()

    3)DataFrameをロードします:
    def withCatalog(cat:String):DataFrame ={
    sqlContext
    .read
    .options(Map(HBaseTableCatalog.tableCatalog-> cat))
    .format( “ org.apache.spark.sql.execution.datasources.hbase”)
    .load()
    }

    val df =withCatalog(catalog)

    4)統合言語クエリ:
    val s =df.filter((($” col0” <=“ row050” &&$” col0”>“ row040”)||
    $” col0” ===“ row005” ||
    $” col0” ===“ row020” ||
    $” col0” ===“ r20” ||
    $” col0” <=“ row005”)&&
    ($” col4″ ===1 ||
    $” col4″ ===42))
    .select(“ col0”、“ col1”、“ col4”)
    s .show

    5)SQLクエリ:
    df.registerTempTable( "table")
    sqlContext.sql( "select count(col1)from table")。show

    Spark-Packageの構成

    ユーザーは、Spark-on-HBaseコネクターを標準のSparkパッケージとして使用できます。パッケージをSparkアプリケーションに含めるには、次を使用します。

    spark-shell、pyspark、またはspark-submit

    > $ SPARK_HOME / bin / spark-shell –packages zhzhan:shc:0.0.11-1.6.1-s_2.10

    ユーザーは、パッケージを依存関係としてSBTファイルに含めることもできます。形式はspark-package-name:version

    です。

    spDependencies + =“ zhzhan / shc:0.0.11-1.6.1-s_2.10”

    セキュアクラスターでの実行

    Kerberos対応のクラスターで実行するには、HBaseトークンの取得と更新がSparkによって行われ、コネクターから独立しているため、ユーザーはHBase関連のjarファイルをクラスパスに含める必要があります。つまり、ユーザーは、kinitを使用するか、principal / keytabを指定することにより、通常の方法で環境を開始する必要があります。次の例は、yarn-clientモードとyarn-clusterモードの両方を使用して安全なクラスターで実行する方法を示しています。 SPARK_CLASSPATHは両方のモードに設定する必要があり、サンプルjarはSparkの単なるプレースホルダーであることに注意してください。

    export SPARK_CLASSPATH =/ usr / hdp / current / hbase-client / lib / hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client / lib / hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

    hrt_qaがヘッドレスアカウントであるとすると、ユーザーはkinitに対して次のコマンドを使用できます。

    kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

    /usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –マスターyarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores1/usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2。 0-106-hadoop2.7.1.2.4.2.0-106.jar

    /usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –masteryarn-cluster –files / etc / hbase / conf / hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 / usr / hdp / current / spark- client / lib / spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

    すべてをまとめる

    HBaseがDataFrameレベルでSparkをサポートする方法の概要を説明しました。 DataFrame APIを使用すると、Sparkアプリケーションは、他のデータソースに格納されているデータと同じくらい簡単にHBaseテーブルに格納されているデータを処理できます。この新機能により、HBaseテーブルのデータは、Sparkアプリケーションやその他のインタラクティブツール(例:ユーザーは、Spark内のHBaseテーブルに対して複雑なSQLクエリを実行したり、Dataframeに対してテーブル結合を実行したり、SparkStreamingと統合してより複雑なシステムを実装したりできます。

    次は何ですか?

    現在、コネクタはHortonworksリポジトリでホストされており、Sparkパッケージとして公開されています。現在、ApacheHBaseトランクに移行中です。移行中に、HBaseトランクのいくつかの重大なバグを特定しました。これらのバグは、マージとともに修正される予定です。コミュニティの作業は、ScanおよびBulkGetの基盤となるコンピューティングアーキテクチャを最適化するためのHBASE-14795およびHBASE-14796を含む包括的なHBase JIRA HBASE-14789、使いやすさのためのJSONユーザーインターフェイスを提供するためのHBASE-14801、 DataFrame書き込みパス、Avroサポート用のHBASE-15334、short、int、long、float、doubleなどのJavaプリミティブタイプをサポートするためのHBASE-15333、複合行キーをサポートするためのHBASE-15335、およびHBASE-15572オプションのタイムスタンプセマンティクスを追加します。コネクタの操作をさらに簡単にするコネクタの将来のバージョンを作成することを楽しみにしています。

    謝辞

    Hamel Kothari、Sudarshan Kadambi、Bloombergの各チームがこの作業を指導し、この作業の検証を支援してくれたことに感謝します。また、フィードバックを提供し、これを改善してくれたHBaseコミュニティにも感謝します。最後に、この作業は以前のSpark HBase統合からの教訓を活用しており、開発者の道を開いてくれたことに感謝します。

    参照:

    SHC:https://github.com/hortonworks/shc-release

    Spark-package:http://spark-packages.org/package/zhzhan/shc

    Apache HBase:https://hbase.apache.org/

    Apache Spark:http://spark.apache.org/


    1. MongoDBのすべてのドキュメントのフィールドの名前を変更するにはどうすればよいですか?

    2. Node.jsとRedis/hget synchronize

    3. MongoDB:パスで見つかった位置(つまり'$')要素が多すぎます

    4. MongoDB $ arrayToObject