PostgreSQLは最先端のオープンソースデータベースとしてよく知られており、データセットの大きさ、小ささ、違いに関係なくデータを管理するのに役立ちます。そのため、PostgreSQLを使用してビッグデータを管理または分析できます。もちろん、これを可能にするいくつかの方法、例えばApacheSpark。このブログでは、Apache Sparkとは何か、およびそれを使用してPostgreSQLデータベースを操作する方法を説明します。
ビッグデータ分析には、2つの異なるタイプの分析があります。
- バッチ分析:一定期間に収集されたデータに基づいています。
- リアルタイム(ストリーム)分析:即時データに基づいて即時結果を取得します。
Apache Sparkとは何ですか?
Apache Sparkは、大規模なデータ処理のための統合分析エンジンであり、バッチ分析とリアルタイム分析の両方をより高速かつ簡単に処理できます。
Java、Scala、Python、Rの高レベルAPIと、一般的な実行グラフをサポートする最適化されたエンジンを提供します。
ApacheSparkコンポーネントApacheSparkライブラリ
Apache Sparkにはさまざまなライブラリが含まれています:
- Spark SQL:SQLまたはDataFrameAPIを使用して構造化データを操作するためのモジュールです。これは、Hive、Avro、Parquet、ORC、JSON、JDBCなどのさまざまなデータソースにアクセスするための一般的な方法を提供します。これらのソース間でデータを結合することもできます。
- Spark Streaming:言語統合APIを使用してスケーラブルなフォールトトレラントストリーミングアプリケーションを簡単に構築して処理をストリーミングし、バッチジョブを作成するのと同じ方法でストリーミングジョブを作成できます。 Java、Scala、Pythonをサポートしています。 Spark Streamingは、余分なコードを追加することなく、失われた作業とオペレーターの状態の両方をすぐに回復します。同じコードをバッチ処理に再利用したり、履歴データに対してストリームを結合したり、ストリームの状態でアドホッククエリを実行したりできます。
- MLib(機械学習):スケーラブルな機械学習ライブラリです。 MLlibには、反復を活用する高品質のアルゴリズムが含まれており、MapReduceで時々使用されるワンパス近似よりも優れた結果を得ることができます。
- GraphX:グラフとグラフ並列計算のためのAPIです。 GraphXは、ETL、探索的分析、および反復グラフ計算を単一のシステム内に統合します。グラフとコレクションの両方と同じデータを表示し、グラフをRDDで効率的に変換および結合し、PregelAPIを使用してカスタムの反復グラフアルゴリズムを作成できます。
ApacheSparkの利点
公式ドキュメントによると、ApacheSparkのいくつかの利点は次のとおりです。
- 速度:ワークロードを100倍高速に実行します。 Apache Sparkは、最先端のDAG(Direct Acyclic Graph)スケジューラー、クエリオプティマイザー、および物理実行エンジンを使用して、バッチデータとストリーミングデータの両方で高いパフォーマンスを実現します。
- 使いやすさ:Java、Scala、Python、R、およびSQLでアプリケーションをすばやく作成します。 Sparkは、並列アプリの構築を容易にする80を超える高レベルのオペレーターを提供します。 Scala、Python、R、およびSQLシェルからインタラクティブに使用できます。
- 一般性:SQL、ストリーミング、および複雑な分析を組み合わせます。 Sparkは、SQLとDataFrame、機械学習用のMLlib、GraphX、SparkStreamingなどのライブラリのスタックを強化します。これらのライブラリを同じアプリケーションでシームレスに組み合わせることができます。
- どこでも実行:Sparkは、Hadoop、Apache Mesos、Kubernetes、スタンドアロン、またはクラウドで実行されます。さまざまなデータソースにアクセスできます。 Sparkは、スタンドアロンクラスターモード、EC2、Hadoop YARN、Mesos、またはKubernetesを使用して実行できます。 HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive、およびその他の数百のデータソースのデータにアクセスします。
それでは、これをPostgreSQLデータベースと統合する方法を見てみましょう。
PostgreSQLでApacheSparkを使用する方法
PostgreSQLクラスターが稼働していることを前提としています。このタスクでは、CentOS7で実行されているPostgreSQL11サーバーを使用します。
まず、PostgreSQLサーバー上にテストデータベースを作成しましょう:
postgres=# CREATE DATABASE testing;
CREATE DATABASE
postgres=# \c testing
You are now connected to database "testing" as user "postgres".
次に、t1:
というテーブルを作成します。testing=# CREATE TABLE t1 (id int, name text);
CREATE TABLE
そしてそこにいくつかのデータを挿入します:
testing=# INSERT INTO t1 VALUES (1,'name1');
INSERT 0 1
testing=# INSERT INTO t1 VALUES (2,'name2');
INSERT 0 1
作成されたデータを確認してください:
testing=# SELECT * FROM t1;
id | name
----+-------
1 | name1
2 | name2
(2 rows)
Apache SparkをPostgreSQLデータベースに接続するには、JDBCコネクタを使用します。ここからダウンロードできます。
$ wget https://jdbc.postgresql.org/download/postgresql-42.2.6.jar
それでは、ApacheSparkをインストールしましょう。このためには、ここからsparkパッケージをダウンロードする必要があります。
$ wget http://us.mirrors.quenda.co/apache/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
$ tar zxvf spark-2.4.3-bin-hadoop2.7.tgz
$ cd spark-2.4.3-bin-hadoop2.7/
Sparkシェルを実行するには、サーバーにJAVAをインストールする必要があります:
$ yum install java
これで、SparkShellを実行できます:
$ ./bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://ApacheSpark1:4040
Spark context available as 'sc' (master = local[*], app id = local-1563907528854).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.3
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
サーバーのポート4040で利用可能なSparkコンテキストWebUIにアクセスできます:
Apache Spark UISparkシェルにPostgreSQLJDBCドライバーを追加する必要があります:
scala> :require /path/to/postgresql-42.2.6.jar
Added '/path/to/postgresql-42.2.6.jar' to classpath.
scala> import java.util.Properties
import java.util.Properties
そして、Sparkで使用するJDBC情報を追加します:
scala> val url = "jdbc:postgresql://localhost:5432/testing"
url: String = jdbc:postgresql://localhost:5432/testing
scala> val connectionProperties = new Properties()
connectionProperties: java.util.Properties = {}
scala> connectionProperties.setProperty("Driver", "org.postgresql.Driver")
res6: Object = null
これで、SQLクエリを実行できます。まず、query1をテストテーブルのSELECT *FROMt1として定義しましょう。
scala> val query1 = "(SELECT * FROM t1) as q1"
query1: String = (SELECT * FROM t1) as q1
そして、DataFrameを作成します:
scala> val query1df = spark.read.jdbc(url, query1, connectionProperties)
query1df: org.apache.spark.sql.DataFrame = [id: int, name: string]
これで、このDataFrameに対してアクションを実行できます:
scala> query1df.show()
+---+-----+
| id| name|
+---+-----+
| 1|name1|
| 2|name2|
+---+-----+
scala> query1df.explain
== Physical Plan ==
*(1) Scan JDBCRelation((SELECT * FROM t1) as q1) [numPartitions=1] [id#19,name#20] PushedFilters: [], ReadSchema: struct<id:int,name:string>
さらに値を追加して再実行し、現在の値が返されることを確認できます。
PostgreSQL
testing=# INSERT INTO t1 VALUES (10,'name10'), (11,'name11'), (12,'name12'), (13,'name13'), (14,'name14'), (15,'name15');
INSERT 0 6
testing=# SELECT * FROM t1;
id | name
----+--------
1 | name1
2 | name2
10 | name10
11 | name11
12 | name12
13 | name13
14 | name14
15 | name15
(8 rows)
スパーク
scala> query1df.show()
+---+------+
| id| name|
+---+------+
| 1| name1|
| 2| name2|
| 10|name10|
| 11|name11|
| 12|name12|
| 13|name13|
| 14|name14|
| 15|name15|
+---+------+
この例では、Apache SparkがPostgreSQLデータベースとどのように連携するかのみを示しており、ビッグデータ情報をどのように管理するかは示していません。
結論
今日、企業でビッグデータを管理するという課題に直面することはかなり一般的です。ご覧のとおり、Apache Sparkを使用してそれに対処し、前述のすべての機能を利用できます。ビッグデータは巨大な世界であるため、Apache SparkとPostgreSQLの使用法の詳細については公式ドキュメントを確認し、要件に適合させることができます。