はじめに
Pythonは、ETL / ELTパイプラインから機械学習モデルの構築まで、あらゆる種類の問題を解決するために、データエンジニアとデータサイエンティストの間で広く使用されています。 Apache HBaseは、多くのワークフローに効果的なデータストレージシステムですが、特にPythonを介してこのデータにアクセスするのは困難な場合があります。 HBaseに保存されているデータを利用したいデータ専門家の場合、最近のアップストリームプロジェクト「hbase-connectors」をPySparkで基本的な操作に使用できます。
このブログシリーズでは、基本的なSparkの使用とCDSWで維持されるジョブのために、PySparkとHBaseを一緒に構成する方法について説明します。 CDSWに慣れていない人にとっては、データサイエンティストが独自の分析パイプラインを管理するための、安全なセルフサービスのエンタープライズデータサイエンスプラットフォームであるため、機械学習プロジェクトを探索から本番まで加速できます。 CDSWの詳細については、Cloudera DataScienceWorkbenchの製品ページをご覧ください。
この投稿では、いくつかの操作を出力例とともに説明およびデモンストレーションします。コンテキストとして、この特定のブログ投稿のすべての操作例は、CDSWデプロイメントで実行されます。
前提条件:
- HBaseとSparkを備えたCDPクラスターを用意する
- CDSWを介して例に従う場合は、CDSWをインストールする必要があります– Cloudera DataScienceWorkbenchのインストール
- Python3は同じパスの各ノードにインストールされます
構成:
まず、Spark SQLクエリが正しく機能するように、HBaseとSparkを一緒に構成する必要があります。これを行うには、2つの部分があります。1つは、ClouderaManagerを介してHBaseリージョンサーバーを構成することです。次に、SparkランタイムにHBaseバインディングがあることを確認します。ただし、Cloudera Managerは、SparkをHBaseに自動的にポイントするように、いくつかの構成変数と環境変数を既に設定していることに注意してください。それでも、Spark SQLクエリを構成する最初のステップは、CDPクラスターでのすべてのタイプのデプロイメントに共通ですが、2番目のステップはデプロイメントのタイプによってわずかに異なります。
HBaseリージョンサーバーの構成
- Cloudera Managerに移動し、HBaseサービスを選択します。
- 「regionserverenvironment」を検索
- RegionServer環境の高度な構成スニペット(安全弁)を使用して、新しい環境変数を追加します。
- キー:HBASE_CLASSPATH
- 値:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
適切なバージョン番号を使用していることを確認してください。
- リージョンサーバーを再起動します。
上記の手順を実行したら、CDSWまたは非CDSWのどちらの展開が必要かによって、以下の手順を実行します。
非CDSWデプロイメントでのSparkランタイムへのHBaseバインディングの追加
シェルをデプロイするか、spark-submitを正しく使用するには、次のコマンドを使用して、sparkに適切なHBaseバインディングがあることを確認します。
pyspark –jars/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded。 jar
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- shadowed.jar
CDSWデプロイメントでのSparkランタイムへのHBaseバインディングの追加
HBaseとPySparkを使用してCDSWを構成するには、いくつかの手順を実行する必要があります。
1)Python 3が各クラスターノードにインストールされていることを確認し、そのノードへのパスをメモします
2)CDSWで新しいプロジェクトを作成し、PySparkテンプレートを使用します
3)プロジェクトを開き、[設定]->[エンジン]->[環境変数]に移動します。
4) PYSPARK3_DRIVER_PYTHONを設定します およびPYSPARK3_PYTHON Pythonがクラスターノードにインストールされているパスへのパス(ステップ1で示したパス)。
以下は、それがどのように見えるべきかのサンプルです。
5)プロジェクトで、[ファイル]-> [spark-defaults.conf]に移動し、Workbenchで開きます
6)以下の行をコピーしてそのファイルに貼り付け、新しいセッションを開始する前に保存されていることを確認します。
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
この時点で、CDSWはHBaseでPySparkジョブを実行するように構成されています。このブログ投稿の残りの部分では、CDSW展開でのいくつかのサンプル操作について説明しています。
操作例
プットオペレーション
HBaseに行を挿入および更新する方法は2つあります。最初の最も推奨される方法は、カタログを作成することです。これは、テーブル名と名前空間を指定しながら、HBaseテーブルの列をPySparkデータフレームにマップするスキーマです。このユーザー定義のJSON形式を作成することは、他の操作でも使用できるため、最も推奨される方法です。カタログの詳細については、このドキュメントhttp://hbase.apache.org/book.html#_define_catalogを参照してください。 2番目の方法は、「hbase.columns.mapping」と呼ばれる特定のマッピングパラメータを使用することです。これは、キーと値のペアの文字列を取得するだけです。
- カタログの使用
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() tableCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empState":{"cf":"personal", "col":"empWeight", "type":"string"} } }""".split()) employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3])) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .options(catalog=tableCatalog, newTable=5) \ .option("hbase.spark.use.hbasecontext", False) \ .save() # newTable refers to the NumberOfRegions which has to be > 3
HBaseシェルを開き、次のコマンドを実行するだけで、「tblEmployee」という新しいテーブルがHBaseに作成されていることを確認します。
スキャン‘tblEmployee’、{‘LIMIT’ => 2}
カタログを使用すると、HBaseテーブルを簡単にロードすることもできます。これについては、今後の記事で説明します。
- hbase.columns.mappingの使用
PySparkデータフレームの作成中に、「hbase.columns.mapping」というオプションを追加して、列を正しくマップする文字列を含めることができます。 このオプションでは、既存のテーブルにのみ行を挿入できます。
HBaseシェルで、最初にテーブルを作成して「tblEmployee2」、「personal」を作成しましょう
PySparkで、「hbase.columns.mapping」を使用して2行を挿入しましょう
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3]))) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \ .option("hbase.table", "tblEmployee2") \ .option("hbase.spark.use.hbasecontext", False) \ .save()
繰り返しになりますが、「tblEmployee2」という新しいテーブルにこれらの新しい行があることを確認してください。
スキャン‘tblEmployee2’、{‘LIMIT’ => 2}
これで、PySparkを介してHBaseテーブルに行を挿入する方法の例が完了しました。次回の記事では、Get and Scan Operations、PySpark SQL、およびいくつかのトラブルシューティングについて説明します。それまでは、CDPクラスターを入手して、これらの例を実行する必要があります。