Apache HBaseは、ビッグデータへのランダムなリアルタイムの読み取り/書き込みアクセスを提供することを目的としていますが、そもそもそのデータを効率的にHBaseに取り込むにはどうすればよいでしょうか。直感的には、新しいユーザーはクライアントAPIを介して、またはTableOutputFormatでMapReduceジョブを使用してこれを実行しようとしますが、以下で学習するように、これらのアプローチには問題があります。代わりに、HBaseの一括読み込み機能ははるかに使いやすく、同じ量のデータをより迅速に挿入できます。
このブログ投稿では、一括読み込み機能の基本的な概念を紹介し、2つの使用例を示し、2つの例を提案します。
一括読み込みの概要
これらの症状のいずれかがある場合は、バルクロードがおそらく正しい選択です:
- メモリの大部分を使用するには、MemStoreを微調整する必要がありました。
- より大きなWALを使用するか、完全にバイパスする必要がありました。
- 圧縮キューとフラッシュキューは数百にのぼります。
- インサートの範囲がMBであるため、GCは制御不能です。
- データをインポートすると、レイテンシがSLAから外れます。
これらの症状のほとんどは、一般的に「成長痛」と呼ばれます。一括読み込みを使用すると、それらを回避するのに役立ちます。
HBaseと言えば、一括読み込みは、HFile(HBase独自のファイル形式)を準備してRegionServerに直接読み込むプロセスであるため、書き込みパスをバイパスして、これらの問題を完全に回避します。このプロセスはETLに似ており、次のようになります。
1。ソース(通常はテキストファイルまたは別のデータベース)からデータを抽出します。 HBaseはプロセスのこの部分を管理しません。つまり、MySQLから直接読み取ることでHFileを準備するようにHBaseに指示することはできません。むしろ、独自の方法で準備する必要があります。たとえば、テーブルでmysqldumpを実行して、結果のファイルをHDFSにアップロードしたり、ApacheHTTPログファイルを取得したりできます。いずれの場合も、次のステップの前にデータをHDFSに保存する必要があります。
2。データをHFilesに変換します。 このステップにはMapReduceジョブが必要であり、ほとんどの入力タイプでは、マッパーを自分で作成する必要があります。ジョブは、行キーをキーとして発行し、KeyValue、Put、またはDeleteを値として発行する必要があります。レデューサーはHBaseによって処理されます。 HFileOutputFormat.configureIncrementalLoad()を使用して構成すると、次のようになります。
- テーブルを調べて、全順序パーティショナーを構成します
- パーティションファイルをクラスターにアップロードし、DistributedCacheに追加します
- 現在のリージョン数に一致するようにreduceタスクの数を設定します
- HFileOutputFormatの要件に一致するように出力キー/値クラスを設定します
- 適切な並べ替え(KeyValueSortReducerまたはPutSortReducer)を実行するようにレデューサーを設定します
この段階で、出力フォルダーのリージョンごとに1つのHFileが作成されます。入力データはほぼ完全に書き換えられるため、元のデータセットのサイズの少なくとも2倍のディスク容量が必要になることに注意してください。たとえば、100GBのmysqldumpの場合、HDFSには少なくとも200GBの使用可能なディスクスペースが必要です。プロセスの最後にダンプファイルを削除できます。
3。 RegionServerにファイルの場所を指示して、ファイルをHBaseにロードします。 これが最も簡単な手順です。 LoadIncrementalHFiles(より一般的にはcompletebulkloadツールとして知られています)を使用する必要があり、HDFS内のファイルを見つけるURLを渡すことにより、各ファイルを、それを提供するRegionServerを介して関連するリージョンにロードします。ファイルの作成後にリージョンが分割された場合、ツールは新しい境界に従ってHFileを自動的に分割します。このプロセスはあまり効率的ではないため、現在テーブルが他のプロセスによって書き込まれている場合は、変換ステップが完了したらすぐにファイルをロードするのが最善です。
これがこのプロセスの図です。データフローは元のソースからHDFSに送られ、RegionServerはファイルをリージョンのディレクトリに移動するだけです。
ユースケース
元のデータセットの読み込み: 別のデータストアから移行するすべてのユーザーは、このユースケースを検討する必要があります。まず、テーブルスキーマを設計する演習を行ってから、事前に分割してテーブル自体を作成する必要があります。分割ポイントでは、行キーの分布とRegionServerの数を考慮する必要があります。深刻なユースケースについては、同僚のLarsGeorgeによる高度なスキーマ設計に関するプレゼンテーションを読むことをお勧めします。
ここでの利点は、RegionServerの書き込みパス(MemStoreとWALの両方への書き込み)を経由して、最終的にフラッシュや圧縮などを行うよりも、ファイルを直接書き込む方がはるかに高速であるということです。また、書き込みが多いワークロード用にクラスターを調整してから、通常のワークロード用に再度調整する必要がないことも意味します。
増分負荷: 現在HBaseによって提供されているデータセットがあるが、サードパーティからより多くのデータをバッチでインポートする必要がある場合、または挿入する必要のある数ギガバイトを生成する夜間のジョブがあるとします。おそらく、HBaseがすでに提供しているデータセットほど大きくはありませんが、レイテンシの95パーセンタイルに影響を与える可能性があります。通常の書き込みパスを通過すると、インポート中に通常よりも多くのフラッシュと圧縮がトリガーされるという悪影響があります。この追加のIOストレスは、レイテンシーに敏感なクエリと競合します。
例
独自のHadoopクラスターで次の例を使用できますが、手順は、シングルノードクラスターであるCloudera QuickStart VM、ゲストOS、およびデスクトップ用の仮想マシンアプライアンスに組み込まれたサンプルデータと例について提供されています。
VMを起動したら、自動的に開くWebインターフェイスを介して、CDHをデプロイするように指示し、HBaseサービスも開始されていることを確認します。
組み込みのTSVバルクローダー
HBaseには、区切り文字で区切られた値ファイルを読み取ってHBaseテーブルに直接出力したり、一括読み込み用のHFileを作成したりできるMRジョブが付属しています。ここでは、次のことを行います:
- サンプルデータを取得してHDFSにアップロードします。
- ImportTsvジョブを実行して、事前構成されたテーブルに従ってファイルを複数のHFileに変換します。
- HBaseでファイルを準備してロードします。
最初のステップは、コンソールを開き、次のコマンドを使用してサンプルデータを取得することです。
curl -O https://people.apache.org/~jdcryans/word_count.csv
このファイルは、このブログ投稿の元の原稿で単語数を実行し、結果を列タイトルなしでcsv形式で出力することによって作成しました。次に、ファイルをHDFSにアップロードします。
hdfs dfs -put word_count.csv
バルクロードの抽出部分が完了したので、ファイルを変換する必要があります。まず、テーブルを設計する必要があります。簡単にするために、これを「単語数」と呼びます。行キーは単語自体であり、「f」と呼ぶファミリでは、唯一の列に単語数が含まれます。テーブルを作成する際のベストプラクティスは、行のキー分布に従ってテーブルを分割することですが、この例では、分割ポイントがキースペース全体に均等に分散された5つの領域を作成するだけです。 hbaseシェルを開きます:
hbase shell
そして、次のコマンドを実行してテーブルを作成します。
create 'wordcount', {NAME => 'f'}, {SPLITS => ['g', 'm', 'r', 'w']}
4つの分割点は5つの領域を生成し、最初の領域は空の行キーで始まります。より良い分割点を取得するために、単語が実際にどのように分布しているかを簡単に分析することもできますが、それはあなたに任せます。
VMのブラウザでhttp:// localhost:60010 /を指定すると、新しく作成されたテーブルとその5つのリージョンがすべてRegionServerに割り当てられていることがわかります。
さあ、重い物を持ち上げる時が来ました。 「hadoop」スクリプトを使用してコマンドラインでHBasejarを呼び出すと、使用可能なツールのリストが表示されます。必要なものはimporttsvと呼ばれ、次の使用法があります。
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv ERROR: Wrong number of arguments: 0 Usage: importtsv -Dimporttsv.columns=a,b,c>
使用するコマンドラインは次のとおりです。
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0- security.jar importtsv -Dimporttsv.separator=, -Dimporttsv.bulk.output=output -Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
さまざまな構成要素の概要は次のとおりです。
- -Dimporttsv.separator =、 区切り文字がコンマであることを指定します。
- -Dimporttsv.bulk.output =output HFilesが書き込まれる場所への相対パスです。 VM上のユーザーはデフォルトで「cloudera」であるため、ファイルが/ user / cloudera/outputにあることを意味します。このオプションをスキップすると、ジョブはHBaseに直接書き込まれます。
- -Dimporttsv.columns =HBASE_ROW_KEY、f:count このファイルに含まれるすべての列のリストです。行キーは、すべて大文字のHBASE_ROW_KEY文字列を使用して識別する必要があります。そうしないと、ジョブが開始されません。 (修飾子「count」を使用することにしましたが、それ以外の場合もあります。)
入力サイズが小さい場合、ジョブは1分以内に完了するはずです。リージョンごとに1つずつ、5つのレデューサーが実行されていることに注意してください。 HDFSでの結果は次のとおりです。
-rw-r--r-- 3 cloudera cloudera 4265 2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b -rw-r--r-- 3 cloudera cloudera 3163 2013-09-12 13:14 output/f/786198ca47ae406f9be05c9eb09beb36 -rw-r--r-- 3 cloudera cloudera 2487 2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2 -rw-r--r-- 3 cloudera cloudera 2961 2013-09-12 13:13 output/f/bb341f04c6d845e8bb95830e9946a914 -rw-r--r-- 3 cloudera cloudera 1336 2013-09-12 13:14 output/f/c656d893bd704260a613be62bddb4d5f
ご覧のとおり、ファイルは現在ユーザー「cloudera」に属しています。それらをロードするには、所有者を「hbase」に変更する必要があります。そうしないと、HBaseにファイルを移動する権限がありません。次のコマンドを実行します:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output
最後のステップでは、completebulkloadツールを使用して、ファイルの場所とロード先のテーブルを指定する必要があります。
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount
HBaseシェルに戻ると、ロードされた行数を表示するcountコマンドを実行できます。 chownを忘れると、コマンドがハングします。
カスタムMRジョブ
TSVバルクローダーはプロトタイピングには適していますが、すべてを文字列として解釈し、変換時のフィールドの操作をサポートしていないため、独自のMRジョブを作成する必要があります。ヨーロッパでソリューションアーキテクトとして働いている同僚のJamesKinleyが、次の例で使用するような仕事を書きました。この仕事のデータには、レイカーズとセルティックスの間の2010 NBAファイナル(ゲーム1)に関連するFacebookとTwitterの公開メッセージが含まれています。コードはここにあります。 (クイックスタートVMにはgitとmavenがインストールされているため、リポジトリのクローンを作成できます。)
Driverクラスを見ると、最も重要なビットは次のとおりです。
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); … // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable);
まず、マッパーは行キーを含むImmutableBytesWritableを出力する必要があり、出力値はKeyValue、Put、またはDeleteのいずれかになります。 2番目のスニペットは、Reducerを構成する方法を示しています。実際には、HFileOutputFormatによって完全に処理されます。前の「変換」セクションで説明したconfigureIncrementalLoad()。
HBaseKVMapperクラスには、構成された出力キーと値を尊重するマッパーのみが含まれます。
public class HBaseKVMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
実行するには、Mavenを使用してプロジェクトをコンパイルし、READMEのリンクをたどってデータファイルを取得する必要があります。 (テーブルを作成するためのシェルスクリプトも含まれています。)ジョブを開始する前に、ファイルをHDFSにアップロードし、今回はjarを使用しないため、クラスパスがHBaseを認識するように設定することを忘れないでください。 :
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*
次のようなコマンドラインを使用してジョブを開始できます:
hadoop jar hbase-examples-0.0.1-SNAPSHOT.jar com.cloudera.examples.hbase.bulkimport.Driver -libjars /home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar, /home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar RowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010>
ご覧のとおり、ジョブの依存関係は個別に追加する必要があります。最後に、最初に所有者を変更してから、completebulkloadツールを実行することにより、ファイルをロードできます。
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010
潜在的な問題
最近削除されたデータが再表示されます。 この問題は、Deleteがバルクロードを介して挿入され、対応するPutがまだMemStoreにある間にメジャー圧縮された場合に発生します。 DeleteがHFileにある場合、データは削除されたと見なされますが、圧縮中に削除されると、Putが再び表示されます。このようなユースケースがある場合は、削除されたセルをシェルのKEEP_DELETED_CELLSまたはHColumnDescriptor.setKeepDeletedCells()で保持するように列ファミリーを構成することを検討してください。
バルクロードされたデータを別のバルクロードで上書きすることはできません。 この問題は、異なる時間にロードされた2つのバルクロードされたHFileが同じセルに異なる値を書き込もうとした場合に発生します。つまり、同じ行キー、ファミリ、修飾子、およびタイムスタンプがあります。その結果、2番目の値ではなく、最初に挿入された値が返されます。このバグはHBase0.96.0およびCDH5(次のCDHメジャーバージョン)で修正され、0.94ブランチおよびCDH4のHBASE-8521で作業が行われています。
一括読み込みは主要な圧縮をトリガーします。 この問題は、増分バルクロードを実行していて、マイナー圧縮をトリガーするのに十分なバルクロードファイルがある場合に発生します(デフォルトのしきい値は3です)。 HFilesはシーケンス番号が0に設定されてロードされるため、RegionServerが圧縮するファイルを選択するときに最初に取得され、バグのために残りのすべてのファイルも選択されます。この問題は、すでに大きなリージョン(複数のGB)を持っている人や、大量のデータが圧縮されるため、頻繁に(数時間以内に)大量にロードする人に深刻な影響を及ぼします。 HBase 0.96.0には適切な修正があり、CDH5も同様です。 HBASE-8521は、バルクロードされたHFileに適切なシーケンス番号が割り当てられるようになったため、0.94の問題を修正します。 HBASE-8283は、0.94.9以降のhbase.hstore.useExploringCompationおよびCDH 4.4.0で有効にでき、よりスマートな圧縮選択アルゴリズムであるだけでこの問題を軽減できます。
一括で読み込まれたデータは複製されません 。一括読み込みは書き込みパスをバイパスするため、WALはプロセスの一部として書き込まれません。レプリケーションはWALファイルを読み取ることで機能するため、一括で読み込まれたデータは表示されません。Put.setWriteToWAL(true)を使用する編集についても同じことが言えます。これを処理する1つの方法は、rawファイルまたはHFileを他のクラスターに送信し、そこで他の処理を実行することです。
結論
このブログ投稿の目的は、ApacheHBaseの一括読み込みの基本的な概念を紹介することでした。このプロセスはETLの実行にどのように似ているか、また、書き込みパスをバイパスするため、通常のAPIを使用するよりもビッグデータセットの方がはるかに優れていることを説明しました。 2つの例は、単純なTSVファイルをHBaseに一括ロードする方法と、他のデータ形式用に独自のマッパーを作成する方法を示すために含まれています。
これで、Hueを介してグラフィカルユーザーインターフェイスを使用して同じことを試すことができます。