この投稿の再公開を許可してくれたFINRAのソフトウェア開発者であるPengyuWangに感謝します。
事前分割されたソルト化されたApacheHBaseテーブルは、RegionServer全体に均一なワークロード分散を提供し、一括書き込み中のホットスポットを防ぐための効果的なHBaseソリューションであることが証明されています。この設計では、行キーは最初に論理キーとソルトを使用して作成されます。ソルトを生成する1つの方法は、論理行キー(日付など)のハッシュコードを法としてn(領域の数)を計算することです。
行キーの塩漬け
たとえば、毎日データの読み込みを受け入れるテーブルでは、日付で始まる論理行キーを使用する場合があり、このテーブルを1,000のリージョンに事前に分割する必要があります。この場合、1,000種類の塩が生成されると予想されます。塩は、たとえば次のように生成できます。
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey = 2015-04-26|abc rowKey = 893|2015-04-26|abc
hashCode()からの出力 モジュロを使用すると、「000」から「999」までのソルト値にランダム性が提供されます。このキー変換を使用すると、テーブルは作成時にソルト境界で事前に分割されます。これにより、MapReduceバルクロードを使用してHFilesをロードするときに、行ボリュームが均一に分散されます。同じソルトの行キーが同じ領域に分類されることが保証されます。
データアーカイブなどの多くのユースケースでは、MapReduceジョブを使用して、特定の論理キー範囲(日付範囲)でデータをスキャンまたはコピーする必要があります。標準テーブルのMapReduceジョブは、Scanを提供することで設定されます キー範囲属性を持つインスタンス。
Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
scan.setStartRow(Bytes.toBytes("2015-04-26"));
scan.setStopRow(Bytes.toBytes("2015-04-27"));
/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job,
true,
TableInputFormat.class
);
…
ただし、そのようなジョブのセットアップは、ソルトされた事前分割テーブルでは困難になります。開始行と停止行のキーは、それぞれに固有のソルトがあるため、リージョンごとに異なります。また、1つのScanに複数の範囲を指定することはできません インスタンス。
この問題を解決するには、テーブルMapReduceがどのように機能するかを調べる必要があります。通常、MapReduceフレームワークは、各入力分割を読み取って処理するための1つのマップタスクを作成します。各分割はInputFormatで生成されます クラスベース、メソッドgetSplits() 。
HBaseテーブルのMapReduceジョブで、TableInputFormat InputFormatとして使用されます 。実装内では、getSplits() Scanから開始行と停止行のキーを取得するために、メソッドがオーバーライドされます 実例。開始行と停止行のキーが複数のリージョンにまたがっているため、範囲はリージョンの境界で分割され、TableSplitのリストが返されます。 スキャンキー範囲をカバーするオブジェクト。 HDFSブロックに基づく代わりに、TableSplit sは地域に基づいています。 getSplits()を上書きする メソッドでは、TableSplitを制御できます 。
カスタムTableInputFormatの構築
getSplits()の動作を変更するには メソッド、TableInputFormatを拡張するカスタムクラス 必要とされている。 getSplits()の目的 ここでは、各リージョンの論理キー範囲をカバーし、固有のソルトを使用して行キー範囲を構築します。 HTableクラスは、メソッドgetStartEndKeys()を提供します。 これは、各リージョンの開始行キーと終了行キーを返します。各開始キーから、その領域に対応するソルトを解析します。
Pair keys = table.getStartEndKeys();
for (int i = 0; i < keys.getFirst().length; i++) {
// The first 3 bytes is the salt, for the first region, start key is empty, so apply “000”
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
…
}
ジョブ構成が論理キー範囲を渡す
TableInputFormat Scanから開始キーと停止キーを取得します 実例。 Scanは使用できないため MapReduceジョブでは、Configurationを使用できます 代わりに、これら2つの変数を渡し、論理的な開始キーと停止キーだけで十分です(変数は日付またはその他のビジネス情報である可能性があります)。 getSplits() メソッドにはJobContextがあります 引数、構成インスタンスはcontext.getConfiguration()として読み取ることができます 。
MapReduceドライバーの場合:
Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");
Custom TableInputFormat :
@Override
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
String scanStart = conf.get("logical.scan.start");
String scanStop = conf.get("logical.scan.stop");
…
}
地域ごとにソルトキー範囲を再構築する
各リージョンのソルトおよび論理開始/停止キーが用意できたので、実際の行キー範囲を再構築できます。
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);
リージョンごとにTableSplitを作成する
行キー範囲を使用して、TableSplitを初期化できるようになりました 地域のインスタンス。
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
…
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);
InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
もう1つ注目すべきことは、データの局所性です。フレームワークは、各入力分割の位置情報を使用して、ローカルホストにマップタスクを割り当てます。 TableInputFormatの場合 、メソッドgetTableRegionLocation()を使用します 行キーを提供するリージョンの場所を取得します。
次に、この場所がTableSplitに渡されます コンストラクタ。これにより、テーブル分割を処理するマッパーが同じリージョンサーバー上にあることが保証されます。 DNS.reverseDns()と呼ばれる1つのメソッド 、HBaseネームサーバーのアドレスが必要です。この属性は、構成「hbase.nameserver.address」に保存されます 「。
this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null);
…
public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException {
HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
regionLocation = regionServerAddress.getHostname();
}
return regionLocation;
}
protected String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer));
this.reverseDNSCacheMap.put(ipAddress, hostName);
}
return hostName;
}
getSplitsの完全なコード 次のようになります:
@Override
public List getSplits(JobContext context) throws IOException {
conf = context.getConfiguration();
table = getHTable(conf);
if (table == null) {
throw new IOException("No table was provided.");
}
// Get the name server address and the default value is null.
this.nameServer = conf.get("hbase.nameserver.address", null);
String scanStart = conf.get("region.scan.start");
String scanStop = conf.get("region.scan.stop");
Pair keys = table.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new RuntimeException("At least one region is expected");
}
List splits = new ArrayList(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]);
String regionSalt = null;
if (keys.getFirst()[i].length == 0) {
regionSalt = "000";
} else {
regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3);
}
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart);
byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);
InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation);
splits.add(split);
}
log.info("Total table splits: " + splits.size());
return splits;
}
MapReduceドライバーでカスタムTableInoutFormatを使用する
次に、TableInputFormatを置き換える必要があります テーブルMapReduceジョブのセットアップに使用したカスタムビルドを使用したクラス。
Configuration conf = getConf();
conf = HBaseConfiguration.addHbaseResources(conf);
HTableInterface status_table = new HTable(conf, status_tablename);
conf.set("logical.scan.start", "2015-04-26");
conf.set("logical.scan.stop", "2015-04-27");
Scan scan = new Scan();
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setBatch(1000);
scan.setMaxVersions(1);
/* Setup the table mapper job */
TableMapReduceUtil.initTableMapperJob(
tablename,
scan,
DataScanMapper.class,
ImmutableBytesWritable.class,
KeyValue.class,
job,
true,
MultiRangeTableInputFormat.class
);
カスタムTableInputFormatのアプローチ バランスの取れたデータロードにsaltを使用するように設計されたHBaseテーブルに効率的でスケーラブルなスキャン機能を提供します。スキャンは、テーブルの大きさに関係なく、関連のない行キーをバイパスできるため、スキャンの複雑さはターゲットデータのサイズのみに制限されます。ほとんどのユースケースでは、これにより、テーブルが大きくなるにつれて比較的一貫した処理時間が保証されます。