Cloudera LabsのSparkOnHBaseプロジェクトは、最近ApacheHBaseトランクに統合されました。この投稿では、プロジェクトの歴史と、新しいHBase-Sparkモジュールの将来がどのようになるかを学びます。
SparkOnHBaseは2014年7月に最初にGithubにプッシュされました。これは、Spark Summit 2013からわずか6か月後、ApacheSparkが最初にCDHで出荷されてから5か月後です。その会議は私にとって大きなターニングポイントでした。なぜなら、MapReduceエンジンには非常に強力な競合他社があることに初めて気づいたからです。 Sparkは、オープンソースのライフサイクルでエキサイティングな新しいフェーズに突入しようとしていました。わずか1年後、数千ではないにしても数百の企業で大規模に使用されています(Clouderaのプラットフォームでは200以上の企業が使用しています)。
>SparkOnHBaseは、HBaseとMapReduceの間ですでに利用可能なものと同様のレベルのHBaseとSparkの間の対話を行うという単純な顧客の要求から生まれました。対象となった機能の概要は次のとおりです。
- マップまたはリデュースステージでのHBaseへのフルアクセス
- 一括読み込みを実行する機能
- get、put、deleteなどの一括操作を実行する機能
- SQLエンジンのデータソースになる機能
SparkOnHBaseの最初のリリースは、作業の公開を許可することに同意したClouderaの顧客向けに構築されました。ありがたいことに、ClouderansとHBasePMCの仲間であるJonHsiehとMatteoBertozzi、およびSparkPMCのメンバーであるTathagataDasから早い段階で助けを得て、ベースのApacheSparkとSparkStreamingの両方で設計が機能することを確認しました。
他の顧客がSparkOnHBaseを使い始めるのはそう長くはありませんでした。特に、スーパーボウルサンデー用のリアルタイムSparkStreamingアプリケーションを備えたEdmunds.comです。他の企業が参加したとき、1人のプロジェクトメンテナ(つまり、私)が拡張できないことがすぐに明らかになりました。幸いなことに、当時、Clouderaは最近Cloudera Labsを発表しました。これは、プロジェクトの完璧な拠点であることが判明しました。簡単に言えば、Cloudera Labsは、企業の準備、開発、野心の点では若いが、最新のテクノロジーを試したいユーザーからの需要が高い、新しいエコシステムプロジェクトの仮想コンテナーです。 SparkOnHBaseはやがてClouderaLabsプロジェクトになりました。
本日、SparkOnHBaseが最近HBaseトランク(HBASE-13992)にコミットされたことを報告できてうれしいです。 HBASE-13992は、新しいモニカであるHBase-Sparkモジュールの下でHBaseコアにSparkOnHBaseを追加します。 HBaseのVPであるAndrewPurtellの励ましと、HBASE-13992の「扉を開く」こととPMCのメンバーであるSeanBusbeyの指導と指導に感謝します。また、コードレビューをしてくれたElliott Clark、Enis Soztutar、Michael Stack、Nicolas Liochon、Kostas Sakellis、Ted Yu、Lars Hofhansl、SteveLoughranに感謝します。 (ご覧のとおり、SparkOnHBaseは本物のコミュニティの取り組みでした。)
特に、HBASE-13992を使用して、SparkとScalaのコードをApacheHBaseプロジェクトに初めて追加することができました。 HBaseの歴史の中で最初のScalaユニットテストを構築する特権を持っていることはとても楽しかったです!
それでは、技術的な詳細を詳しく見ていきましょう。
HBASE-13992の内部
HBASE-13992では、SparkOnHBaseの元のコードとデザインのほとんどが変更されていないことがわかります。コードのコア部分は、すべてのSparkExecutorでHBase接続オブジェクトを取得するように設計されているという点で基本的なアーキテクチャは引き続き維持されます。
基本は変わりませんが、HBASE-13992パッチとClouderaLabsSparkOnHBaseプロジェクトには3つの大きな違いがあります。
- HBase API: HBASE-13992は、すべての新しいHBase1.0+APIを使用します。
- RDDおよびDStream関数: 関数の実行方法に関するSparkOnHBaseに関する最大の不満の1つ。 Spark愛好家は、RDDまたはDStreamから直接HBaseアクションを作成したいと考えていました。 HBASE-13992では、その機能は単体テストと例を介して組み込まれています。さらに、この投稿の後半にRDDから直接HBase関数のコード例があるので、APIがどのように見えるかを感じることができます。
- 簡単な
foreach
およびmap
機能:foreachPartition
を実行するのがさらに簡単になりました sおよびmapPartition
■HBase接続を使用します。この投稿の後半で例を示します。
それでは、少し時間を取って、SparkOnHBaseコードベースとHBASE-13992パッチの違いを見ていきましょう。これがbulkDelete
の簡単な例です SparkOnHBaseから:
val hbaseContext =new HBaseContext(sc、config); hbaseContext.bulkDelete [Array [Byte]](rdd、tableName、putRecord => new Delete(putRecord)、> 4);この例では、操作が実際にはRDDで実行されていたとしても、HBaseContextオブジェクトから直接関数を呼び出していることに注意してください。それでは、同じコードのHBase-Sparkモジュールを見てみましょう:
val hbaseContext =new HBaseContext(sc、config)rdd.hbaseBulkDelete(hbaseContext、tableName、putRecord => new Delete(putRecord)、> 4)大きな違いは、
hbaseBulkDelete
メソッドはRDDから直接出てきます。また、このアプローチでは、将来のJIRAで次のオプションへの扉が開かれたままになります。val hbaseContext =new HBaseContext(sc、config)rdd.hbaseBulkDelete(tableName)これは今のところ私が手に入れることができる限りきれいですが、目標はそれをさらにもっとすることです シンプルでクリーン。
また、HBASE-13992のforeach関数とmap関数についても簡単に見てみましょう。
ForeachPartition
で確認できます 以下の例では、イテレータとHBaseConnection
があります。 物体。これにより、値を反復処理するときに、HBaseで何でもできるようになります。val hbaseContext =new HBaseContext(sc、config)rdd.hbaseForeachPartition(hbaseContext、(it、conn)=> {val bufferedMutator =conn.getBufferedMutator(TableName.valueOf( "t1"))... bufferedMutator.flush( )bufferedMutator.close()})最後に、値を反復処理するときに接続オブジェクトを取得できるマップパーティション関数の例を次に示します。
val getRdd =rdd.hbaseMapPartitions(hbaseContext、(it、conn)=> {val table =conn.getTable(TableName.valueOf( "t1"))var res =mutable.MutableList [String]()..。 })今後の作業
次のJIRAが私のTODOリストにあります:
HBASE-14150 –
BulkLoad
を追加 HBase-Sparkモジュールの機能間もなく、次のような単純なコードを使用して、RDDから直接一括読み込みを実行できるようになります。
rdd.hbaseBulkLoad(tableName、t => {Seq((new KeyFamilyQualifier(t.rowKey、t.family、t.qualifier)、t.value))。HBASE-14181 –SparkDataFrameデータソースをHBase-Sparkモジュールに追加
このパッチを使用すると、Spark SQLをHBaseと直接統合し、スキャン範囲のプッシュダウンに加えて、フィルターや列選択のプッシュダウンなどの優れた機能を実行できるようになります。 Spark SQLとHBaseの相互作用を取得する目的は、次のように単純です。
val df =sqlContext.load( "org.apache.hadoop.hbase.spark"、Map( "hbase.columns.mapping"-> "KEY_FIELD STRING:key、A_FIELD STRING c:a、B_FIELD STRING c:b 、"、" hbase.table "->" t1 "))df.registerTempTable(" hbaseTmp ")sqlContext.sql(" SELECT KEY_FIELD FROM hbaseTmp "+" WHERE "+"(KEY_FIELD ='get1' and B_FIELD <'3 ')または "+"(KEY_FIELD <=' get3'and B_FIELD ='8') ")。foreach(r => println("-" + r))コードを使いやすくし、単体テストをより包括的にするように設計された他のJIRAがあります。私の個人的な目標は、フォローアップのブログ投稿で、私たちが行っているすべての大きな進歩について報告できるようにすることです。目的は、SparkをHBaseに関してふさわしい一級市民に変え、業界でのMapReduceの代替としてさらに強固にすることです。 MapReduceをSparkに置き換えると、ディスクIOの競合が増えることを心配することなく、HBaseクラスターでさらに多くの処理を実行できるようになります。
HBase-SparkモジュールがHBaseのリリースになるまでには時間がかかります。当面の間、HBase-SparkモジュールからClouderaLabsのSparkOnHBaseにコードの一部をバックポートする計画があります。現在、SparkOnHBaseはCDH 5.3および5.4で動作します。目標は、2015年後半に予定されているCDHマイナーリリースに向けて、HBase-Sparkモジュールの進歩でSparkOnHBaseを更新することです。
Ted Malaskaは、Clouderaのソリューションアーキテクトであり、Spark、Apache Flume、Apache HBaseの寄稿者であり、O’Reillyの本の共著者です。 Hadoopアプリケーションアーキテクチャ。