2019年10月10日木曜日

Intel MKLのようなネイティブな数学ライブラリでSpark機械学習アルゴリズムを加速する

  • このエントリーをはてなブックマークに追加


Intel MKLのようなネイティブな数学ライブラリでSpark機械学習アルゴリズムを加速する

最近、仕事上でSpark MLlibのSVD(特異値分解)でMovieLensの推薦を実装するアプリの性能調査をしております。思った以上遅くて困っていました。SparkのDriver, Executorのメモリ、Executor数、OverHeadなどいろんなSparkパラメーターを調整して試してきましたが、なかなか改善できていません。

そして更に調査したところ、そもそもSpark MllibのSVDの実装が古いらしく、性能が出ない情報がネット上で結構上がっています。Workaroundとしては、SVDの代わりにAlternating Least Squares(ALS)などを使うか、あるいはネイティブな数学演算ライブラリを使って高速化するか などがあります。

今回はHDP上でIntel MKLを利用するための設定方法をご紹介します。

まず、ClouderaのBlogを参照しています。CDHのクラスタの場合はMKLのParcelsファイルが提供されているため、Cloudera Managerから簡単にインストールすることができます。
Using Native Math Libraries to Accelerate Spark Machine Learning Applications
https://docs.cloudera.com/documentation/guru-howto/data_science/topics/ght_native_math_libs_to_accelerate_spark_ml.html#using_native_math_libs_with_spark_machine_learning

Native Math Libraries for Spark ML

Spark MLlibは、Breeze線形代数パッケージを使用しています。Breezeは最適化な数値処理を行うにはnetlib-javaというライブラリに依存する。 netlib-javaは、低レベルのBLAS、LAPACK、およびARPACKライ​​ブラリのラッパーです。しかし、ライセンス関連の制限でCDHのSpark,あるいはApache CommunityのSparkに既定でnetlib-javaのネイティブなプロキシが含まれていません。特に手動で何も設定していない場合、netlib-javaはF2JというJavaベースのライブラリを使います。

ネイティブな数学ライブラリを使っているか、JavaベースのF2Jを使っているか Spark-shellで確認できます。

scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS

scala> println(BLAS.getInstance().getClass().getName())
com.github.fommil.netlib.F2jBLAS

com.github.fommil.netlib.F2jBLASが表示されている場合は、F2jを使っていることを確認できます。

Intel MKLのインストールと設定

Intel MKL(Intel’s Math Kernel Library)を利用することで、(Alternating Least Squares (ALS) algorithm),Latent Dirichlet Allocation (LDA), Primary Component Analysis (PCA), Singular Value Decomposition (SVD) といったアルゴリズムを使ったモデル訓練を高速化することができます。

Intel MKLインストール

各Sparkノードで実施
https://software.intel.com/en-us/articles/installing-intel-free-libs-and-python-yum-repo

# curl -O https://yum.repos.intel.com/mkl/setup/intel-mkl.repo > /etc/yum.repos.d/intel-mkl.repo
# sudo yum -y install intel-mkl-2019.3-062
intel-mkl-<VERSION>.<UPDATE>-<BUILD_NUM>の形式で指定

image.png
MKLのインストール先は/opt/intel/mkl になります。

mklWrapperのダウンロード

mklWrapper.jarとmkl_wrapper.soを以下のリンクからダウンロードする
https://github.com/Intel-bigdata/mkl_wrapper_for_non_CDH

Copy mkl_wrapper.jar to /opt/intel/mkl/wrapper/mkl_wrapper.jar
Copy mkl_wrapper.so to /opt/intel/mkl_wrapper/mkl_wrapper.so

Spark Configuration

# echo "/opt/intel/mkl/lib/intel64" > /etc/ld.so.conf.d/mkl_blas.conf
# ldconfig

AmbariでSpark-default設定を追加

propety value
spark.driver.extraClassPath /opt/intel/mkl/wrapper/mkl_wrapper.jar
spark.executor.extraClassPath /opt/intel/mkl/wrapper/mkl_wrapper.jar
spark.driver.extraJavaOptions -Dcom.github.fommil.netlib.BLAS=com.intel.mkl.MKLBLAS -Dcom.github.fommil.netlib.LAPACK=com.intel.mkl.MKLLAPACK
spark.executor.extraJavaOptions -Dcom.github.fommil.netlib.BLAS=com.intel.mkl.MKLBLAS -Dcom.github.fommil.netlib.LAPACK=com.intel.mkl.MKLLAPACK
spark.driverEnv.MKL_VERBOSE 1
spark.executorEnv.MKL_VERBOSE 1

実際の設定 Custom Spark2-defaults
image.png

spark.executor.extraJavaOptionsは Advanced spark2-defaultsにあります。すでにあった値の後ろに追加する
image.png

確認

image.png

com.intel.mkl.MKLBLASが表示されることを確認できます。

そして再度SVDを実行してみたところ、前に比べて30%〜35%の時間短縮することができました。
アルゴリズムによって加速する度合いが違いますので、ALSの場合は4.3倍も速いという結果が出ています。ぜひ試してみてください。

HDP3 Hive Warehouse connectorを使ってSparkからHiveテーブルにアクセスする

  • このエントリーをはてなブックマークに追加


HDP3 Hive Warehouse connectorを使ってSparkからHiveテーブルにアクセスする

HDP3の環境でSparkからHiveテーブルにアクセスする方法をご紹介します。

HDP以前のバージョンはSpark HiveContext/SparkSession を使ってHiveテーブルにアクセスしていますが、HDP3はHortonworksが開発したHive Warehouse Connector(HWC)を使ってアクセスすることができます。
以下の図の通り、HDP3でSparkとHiveそれぞれMetadataを持っています。お互いへのアクセスはHWC経由になります。
image.png

Hive Warehouse Connector

Hive LLAPを使ってSparkのDataFrameをHiveテーブルにWrite, HiveテーブルデータをDataFrameにReadするためのライブラリになっています。
Hive LLAPを有効にする必要があります。

HWCは以下のアプリケーションをサポートしています。

  • Spark-shell
  • Pyspark
  • Spark-submit

使い方としてはこちらをご参照ください。
HiveWarehouseSession API operations
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/integrating-hive/content/hive_hivewarehousesession_api_operations.html

HWC使うための設定

設定手順は以下のリンクに詳しく書いてあります。
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/integrating-hive/content/hive_configure_a_spark_hive_connection.html

基本的にAmbariでCustom spark-2-defaultsに以下のプロパティを設定する

Property Description Comments
spark.sql.hive.hiveserver2.jdbc.url URL for HiveServer2 Interactive In Ambari, copy the value from Services > Hive > Summary > HIVESERVER2 INTERACTIVE JDBC URL.
spark.datasource.hive.warehouse.metastoreUri URI for metastore Copy the value from hive.metastore.uris. For example, thrift://mycluster-1.com:9083.
spark.datasource.hive.warehouse.load.staging.dir HDFS temp directory for batch writes to Hive For example, /tmp.
spark.hadoop.hive.llap.daemon.service.hosts Application name for LLAP service Copy value from Advanced hive-interactive-site > hive.llap.daemon.service.hosts.
spark.hadoop.hive.zookeeper.quorum Zookeeper hosts used by LLAP Copy value from Advanced hive-sitehive.zookeeper.quorum.

実際の設定画面です
image.png

で、注意する必要があるのが、spark.sql.hive.hiveserver2.jdbc.urlです。
手順で直接Ambari上のHIVESERVER2 INTERACTIVE JDBC URLをコピーするってかいてありますが、実行時にPermission Errorのエラーが表示される場合があります。
例えばPysparkの場合

pyspark --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar --py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.1.0.0-78.zip


Error
py4j.protocol.Py4JJavaError: An error occurred while calling o72.executeQuery.
: java.lang.RuntimeException: java.io.IOException: shadehive.org.apache.hive.service.cli.HiveSQLException: java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: Failed to compile query: org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException: Permission denied: user [anonymous] does not have [USE] privilege on [default]
    at com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader.readSchema(HiveWarehouseDataSourceReader.java:130)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.apply(DataSourceV2Relation.scala:56)

image.png

解決策2つがあります。
1, HIVESERVER2 INTERACTIVE JDBC URLの後ろにユーザーを指定する。例えばuser=hive

jdbc:hive2://hdp-srv2.demotest.com:2181,hdp-srv1.demotest.com:2181,hdp-srv4.demotest.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive;user=hive

2, コードの中にHiveWarehouseSession作成時にユーザーを指定する。例:.userPassword(‘hive’,‘hive’)
ソースコードからヒントを得ました。
https://github.com/hortonworks/hive-warehouse-connector/blob/HDP-3.1.0.158/python/pyspark_llap/sql/session.py

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).userPassword('hive','hive').build()
hive.setDatabase("default")
hive.executeQuery("select * from test_table").show()
hive.showTables().show()
hive.showDatabases().show()

Zeppelin Spark Interpreterから接続する際の設定

ZeppelinからHiveにアクセスする場合もZeppelin側での設定が必要です。
詳細の設定はこちら
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/integrating-hive/content/hive_zeppelin_configuration_hivewarehouseconnector.html

設定例
image.png

ただし、実際にコード実行したら、以下のように pyspark_llapモジュールが見つかりません というエラーが出力されます。
image.png

調べたところ、Zeppelin側でspark.submit.pyfilesの設定がうまく動作しないバグがあるらしい。
回避策として、コートの中に直接モジュールファイルをインポートする。

image.png

これでspark-shell, pyspark, zeppelinから使えるようになります。

2019年7月19日金曜日

Druidインストール後、Druid Coordinatorが起動失敗

  • このエントリーをはてなブックマークに追加


Druidインストール後、Druid Coordinatorが起動失敗

HDP 3.1にDruidを追加し、起動したところ、Druid Coordinatorが以下のエラーにより起動失敗しています。

エラーログ

2019-07-19T14:35:13,842 WARN [main] io.druid.metadata.SQLMetadataConnector - Exception creating table
org.skife.jdbi.v2.exceptions.CallbackFailedException: io.druid.java.util.common.ISE: Database default character set is not UTF-8.
  Druid requires its MySQL database to be created using UTF-8 as default character set.
        at org.skife.jdbi.v2.DBI.withHandle(DBI.java:284) ~[jdbi-2.63.1.jar:2.63.1]
        at io.druid.metadata.SQLMetadataConnector$2.call(SQLMetadataConnector.java:135) ~[druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.java.util.common.RetryUtils.retry(RetryUtils.java:63) ~[java-util-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.java.util.common.RetryUtils.retry(RetryUtils.java:81) ~[java-util-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.metadata.SQLMetadataConnector.retryWithHandle(SQLMetadataConnector.java:139) ~[druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.metadata.SQLMetadataConnector.retryWithHandle(SQLMetadataConnector.java:148) ~[druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.metadata.SQLMetadataConnector.createTable(SQLMetadataConnector.java:189) [druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.metadata.SQLMetadataConnector.createRulesTable(SQLMetadataConnector.java:287) [druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.metadata.SQLMetadataConnector.createRulesTable(SQLMetadataConnector.java:553) [druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.metadata.SQLMetadataRuleManagerProvider$1.start(SQLMetadataRuleManagerProvider.java:71) [druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.java.util.common.lifecycle.Lifecycle.start(Lifecycle.java:311) [java-util-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.guice.LifecycleModule$2.start(LifecycleModule.java:134) [druid-api-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.cli.GuiceRunnable.initLifecycle(GuiceRunnable.java:101) [druid-services-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.cli.ServerRunnable.run(ServerRunnable.java:50) [druid-services-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.cli.Main.main(Main.java:116) [druid-services-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
Caused by: io.druid.java.util.common.ISE: Database default character set is not UTF-8.
  Druid requires its MySQL database to be created using UTF-8 as default character set.
        at io.druid.metadata.storage.mysql.MySQLConnector.tableExists(MySQLConnector.java:174) ~[?:?]
        at io.druid.metadata.SQLMetadataConnector$4.withHandle(SQLMetadataConnector.java:195) ~[druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at io.druid.metadata.SQLMetadataConnector$4.withHandle(SQLMetadataConnector.java:191) ~[druid-server-0.12.1.3.1.0.0-78.jar:0.12.1.3.1.0.0-78]
        at org.skife.jdbi.v2.DBI.withHandle(DBI.java:281) ~[jdbi-2.63.1.jar:2.63.1]
        ... 14 more
2019-07-19T14:35:13,846 ERROR [main] io.druid.cli.CliCoordinator - Error when starting up.  Failing.
org.skife.jdbi.v2.exceptions.CallbackFailedException: org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'druid.druid_rules' doesn
't exist [statement:"SELECT id from druid_rules where datasource=:dataSource", located:"SELECT id from druid_rules where datasource=:dataSource", rewritten:"SELECT id from druid_rules where datasource=?", arg
uments:{ positional:{}, named:{dataSource:'_default'}, finder:[]}]

MySqlの文字コードがUTF-8ではないため、テーブルが見つからない というようなエラー

早速MySqlでdatabase(druid)の文字コードを変更してみます。

alter database druid character set utf8 collate utf8_general_ci;
use druid;
show variables like "character_set_database";
+------------------------+-------+
| Variable_name          | Value |
+------------------------+-------+
| character_set_database | utf8  |
+------------------------+-------+
1 row in set (0.00 sec)

これで起動成功しました。

2019年6月27日木曜日

NiFi1.8からNiFi1.9にアップグレードのメモ

  • このエントリーをはてなブックマークに追加


NiFi1.8からNiFi1.9にアップグレードのメモ

一つのAmbariでHDP3.1とHDF3.3(NiFiだけを使っている)のクラスタを管理しています。NiFi1.8のPutHive3Streaming関連のメモリリークのバグがあるらしく、一定時間経つとをJVM OOMエラーを起こしてしまいます。

There are unfortunately 2 memory leaks around this processor:
one in the NiFi processor that was fixed in 1.9.0:
https://issues.apache.org/jira/browse/NIFI-5841
and one in the Hive client library,
https://issues.apache.org/jira/browse/HIVE-20979

そこでAmbariとHDP3.1のコンポーネントのバージョンはそのまま、HDF3.3.0(NiFi1.8)からHDF3.4.1.1(NiFi1.9)へのアップグレードを試してみました。

手順は概ねこちらになります

NiFiなど関連コンポーネント停止
https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.4.1.1/ambari-managed-hdf-upgrade/content/hdf-stop-hdf-services.html

HDFのManagement Packをアップグレード
https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.4.1.1/ambari-managed-hdf-upgrade/content/upgrade-mpack-hdf-on-hdp.html
image.png
手順7の中のHDF-3.4が書いてありますが、実際のところはHDF-3.4が存在していない、設定できないことになっています。
image.png

新しいバージョンを作る事も考えたのですが、バージョンの名前が共存できないため、新しい作ると、HDPのバージョンも合わせて変わってくるため、ちょっと危険の匂いがするので、やめておきました。
image.png

そこで新しいバージョンのNiFiを手動で各サーバーでインストールしました。
上の手順8のところ、各ノードに以下を追加

path=/
enabled=1
gpgcheck=0
[HDF-3.4-repo-1]
name=HDF-3.4-repo-1
baseurl=http://public-repo-1.hortonworks.com/HDF/centos7/3.x/updates/3.4.1.1

次に、インストール。今回はNiFiとNiFI-toolkitだけ

yum install -y nifi_3_4_1_1_4
yum install -y nifi_3_4_1_1_4-toolkit

インストールされていることを確認
image.png
image.png

/usr/hdf/の下に2つバージョンのHDFが存在しています
シンボリックリンクは古いバージョンのままになっています。
以下を実行して、シンボリックリンクを変更します(直接変更してもいいですが。。)

hdf-select set nifi 3.4.1.1-4
hdf-select set nifi-toolkit 3.4.1.1-4

image.png

最後にAmbariからnifi.propertiesに以下のプロパティを追加する

nifi.nar.library.autoload.directory = {{nifi_internal_dir}}/work/extensions

この状態でAmbariからNiFiを起動する

image.png

成功した!!

2019年6月21日金曜日

MiNiFiでセンサーデータを取得し、NiFiに転送してHDFS、Hiveに書き込む

  • このエントリーをはてなブックマークに追加


MiNiFiでセンサーデータを取得し、NiFiに転送してHDFS、Hiveに書き込む

「センサーをラズパイに接続し、MiNiFiでセンサーデータをTailする。そしてNiFiに転送して、NiFiでHDFS, Hiveに書き込む」の手順をご紹介します。

実現したいこと

  • MiNiFiでセンサーデータを取得し、NiFiに転送する
  • NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する
  • NiFiでセンサーデータをHiveテーブルに保存する
  • NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する

環境情報

  • センサーをラズパイに接続
  • ラズパイでセンサーデータを取るPythonスクリプト
  • HDP3.1(Hadoop 3.1.1, Hive 3.1.0) & HDF 3.3.1(NiFi 1.8.0, Kafka 2.0.0)、クラスタは同一Ambariで管理

構成

構成は以下の様な感じになります。パース処理のところは一つしか書いてないですが、複数のパース処理の集合だと思ってください。
enter image description here

全体データフロー

  • RemoteMiNiFiというInput portとProcessDataというPrecess Groupで構成されています。
    enter image description here
  • ProcessDataグループ内の詳細フロー
    enter image description here

やってみよう

1、MiNiFiのセットアップ

ラズパイ自体のOSインストールや、センサーとの接続ができた状態(ここでは割愛します)でMiNiFiのインストールをやります。
MiNiFiのTarファイルはHortonworksサイトにあります。
https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/release-notes/content/hdf_repository_locations.html

sudo su -
cd /home/pi

#HortonworksのサイトからMiNiFiのtarファイルをダウンロードする
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-0.6.0.3.3.1.0-10-bin.tar.gz
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz

#ファイル解答し、シンボリックリンクを作成する
tar -zxvf minifi-0.6.0.3.3.1.0-10-bin.tar.gz
tar -zxvf minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz
ln -s minifi-0.6.0.3.3.1.0-10 minifi
ln -s minifi-toolkit-0.6.0.3.3.1.0-10 tool_minifi

#MiNiFiをインストールする
./minifi/bin/minifi.sh install
#必要に応じてリモートNiFiホストを/etc/hostsに追加する

2、センサーデータをPythonスクリプトで取得

Pythonスクリプトは/home/pi/bme280-dataフォルダに2019-06-04.csvのようなファイルが生成されます。

中身は:「センサーID, 日付, 時間, 気圧, 温度, 湿度」のカンマ区切りのデータになります。
1,2019-06-04,23:59:51,1011.54,24.86,50.67

#coding: utf-8
import bme280_custom
import datetime
import os

dir_path = '/home/pi/bme280-data’
now = datetime.datetime.now()
filename = now.strftime('%Y-%m-%d’)
label = now.strftime('%H:%M:%S’)
csv = bme280_custom.readData()
if not os.path.exists('/home/pi/bme280-data’):
  os.makedirs('/home/pi/bme280-data’)
f = open('/home/pi/bme280-data/'+filename+'.csv','a’)
f.write('1,'+filename +","+label+","+csv+"\n")
f.close()

このスクリプトをCronとかで定期的に実行するように設定する(例えば10秒ごとに実行)

3、NiFiでデータフローを作成し、MiNiFiに配布

MiNiFiで直接データフローを作成するのが難しいので、NiFiで作成して、テンプレート(.xml)としてエクスポート、MiNiFi toolkitで.xmlを.ymlに変換するのが一般的です。
今回はTailFileプロセッサとRemoteProcessGroupを使います。

  • NiFiのTop画面でInput portを追加する。名前をRemoteMiNiFiに設定する
    enter image description here
  • TailFileプロセッサでフォルダの.csvファイルを取り込みます。以下のプロパティを設定します。
    Tailing mode:  Multiple files
    File(s) to Tail:  .*.csv
    Base Directory: /home/pi/bme280-data
    enter image description here
  • Remote Process Groupを以下のように設定して、RemoteMiNiFiをInput Portとして選択する
    URLsにNiFiのURL(http://hdp-srv4.demotest.com:9090/nifi)を指定する。複数ある場合は、カンマ区切りで入力する。
    Transport ProtocolでHTTPを選択する。
    enter image description here
  • 作ったデータフローをテンプレートにエクスポートする
    TailFileプロセッサとRemoteProcessGroupを繋いで、全部選択して、右クリック、「Create template」でテンプレートを作成する。
    作成したら、ダウンロードする
    enter image description here
  • MiNiFiでテンプレートファイル(.xml)から.ymlファイルに変換する
    ラズパイにログインして、以下のコマンドでファイルを変換して、MiNiFiを起動する
sudo su -
cd /home/pi

#MiNiFi toolkitでxmlからymlに変換する
./tool_minifi/bin/config.sh transform /home/pi/sensor_minifi4.xml ./sensor_minifi4.yml

#既存のconfig.ymlファイルをバックアップし、新しいymlで上書きする
cp -p minifi/conf/config.yml minifi/conf/config.yml.bk
cp -p sensor_minifi4.yml minifi/conf/config.yml

#MiNiFiプロセスを起動する。他にstop, restartなどオプションがある
./minifi/bin/minifi.sh start

ここまできたら、MiNiFiからセンサーデータをNiFiの方に転送できるようになります。

4、NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する

ここからは、NiFiでセンサーデータをKafka経由でHDFSに保存するデータフローを作成していきます。
全体のデータフローはこんな感じです。
enter image description here

  • NiFi画面でUser1というProcess Groupをドラッグ&ドロップする。すでに作成済みのRemoteMiNiFi input portと繋ぐ
    enter image description here
    User1グループをダブルクリックで入って後続のフロー作成に入ります。

  • FromMinifiというInput portとPublishKafka_2_0プロセッサを作成して、繋ぐ
    enter image description here
    PublishKafka_2_0のPROPERTIESタブで必要なプロパティを設定する。
    Kafka Brokers: hdp-srv1.demotest.com:6667,hdp-srv2.demotest.com:6667,hdp-srv3.demotest.com:6667 を入力
    Topic Name: sensor_data_user1
    Delivery Guarantee: Guarantee Replicated Delivery を選択
    enter image description here
    PublishKafka_2_0のSETTINGSタブでfailureとsuccess両方をチェックする。
    enter image description here

  • ConsumeKafka_2_0プロセッサを追加する
    Kafka Brokers: hdp-srv1.demotest.com:6667,hdp-srv2.demotest.com:6667,hdp-srv3.demotest.com:6667 を入力
    Topic Name: sensor_data_user1
    Group ID: group1_user1
    enter image description here

  • MergeContentプロセッサを追加する
    Minimum Number of Entries: 10 に変更
    enter image description here

  • PutHDFSプロセッサを追加する
    Hadoop Configuration Resources: /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
    Directory: /tmp/sensor_data/user1
    enter image description here
    SETTINGSタブでsuccessをチェックする
    enter image description here

  • 最後にLogAttributeプロセッサを追加する
    PROPERTIESタブは既定のままで、SETTINGSタブでsuccessをチェックする
    enter image description here
    MergeContentのoriginal, failure, PutHDFSのfailureと繋ぐ

  • HDFS上のファイルの中身を見てみる
    enter image description here

5、NiFiでセンサーデータをHiveテーブルに保存する

ここでHive Streamingを使ってセンサーデータをリアルタイムにHiveテーブルに追加します。
これを実現するには、Hiveテーブルがいくつか要件を満たす必要があります。
詳細はこちらをご参照くださいStreamingDataIngest-StreamingRequirements

1,ACIDサポートのため、hive-site.xmlに以下3つのパラメータを設定(HDP3.1ではすでに設定ずみ)
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 
hive.compactor.initiator.on = true 
hive.compactor.worker.threads > 0

2,テーブル作成時に以下を含むことが必要
STORED AS ORC
tblproperties("transactional"="true")
CLUSTERED BY (cloumn-name) INTO <num> BUCKETS

下記の通りでテーブル(sensor_data_user1)を作成します。

CREATE TABLE sensor_data_user1(id int,time_str string, pressure double, temperature double, humidity double)
PARTITIONED BY(date_str string) 
CLUSTERED BY (id) INTO 5 BUCKETS 
STORED AS ORC 
tblproperties("transactional" = "true");

この部分のデータフローは図の通りになります。
enter image description here

これからプロセッサを説明していきます。

  • UpdateAttributeプロセッサを追加し、FromMinifi input portと接続する
    enter image description here
  • UpdateAttributeのPROPERTIESタブでschema.name=sensor_data_schema1を追加する
    enter image description here
  • ConvertRecordプロセッサを追加する。CSVをAvroに変換
    Record Reader: CSVReaderを選択
    Record Writer: AvroRecordSetWriterを選択
    enter image description here
    CSVReaderの→をクリックする。
    Controller ServicesタブでAvroRecordSetWriterとCSVReaderが追加される。
    enter image description here
    ⚙をクリックする。
    Schema Access Strategy: Use ‘Schema Name’ Propertyを選択
    Schema Registryで Create new service… を選択する
    enter image description here
    AvroSchemaRegistry… を選択してCreateをクリックする
    enter image description here
    Schema Registryで AvroSchemaRegistryが表示される。→をクリックする。Save changes before going to Controller Service?が表示され、Yesクリックする
    enter image description here
    Controller ServicesタブでAvoSchemaRegistryが追加される
    ⚙をクリックする
    enter image description here
    Propertiesタブでプロパティ sensor_data_schema1を追加する
    image.png

Avroスキーマは下記の通り

	{
	     "type": "record",
	     "namespace": "sensor_data_schema1",
	     "name": "sensor_data_schema1",
	     "fields": [
           { "name": "id", "type": "int" },
	       { "name": "date_str", "type": "string" },
	       { "name": "time_str", "type": "string" },
	       { "name": "pressure", "type": "double" },
	       { "name": "temperature", "type": "double" },
	       { "name": "humidity", "type": "double" }
	     ]
	} 

終わったら、CSVReaderがInvalid、AvroSchemaRegistryがDisabledの状態。
赤枠のアイコンをクリックしてAvroSchemaRegistryを有効にする(Enable→Closeをクリックする)。
CSVReaderも有効にする
enter image description here
最後にAvroRecordSetWriterも既定のままで有効にする
enter image description here
ConvertRecordのfailureをLogAttributeに繋ぐ

  • PutHive3Streamingプロセッサを追加する
    enter image description here
    PutHive3StreamingのPropertiesタブで以下の値を設定する
    Record Reader: AvroReaderを選択
    Hive Metastore URL: thrift://hdp-srv3.demotest.com:9083を入力
    Hive Configuration Resources: /etc/hive/conf/hive-site.xml,/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xmlを入力
    Database Name: defaultを入力
    Table Name: sensor_data_user1を入力
    enter image description here
    Record Readerの→をクリックし、AvroReaderを有効にする(Enable→Close)
    enter image description here
    PutHive3StreamingのSettingsタブでsuccessをチェックする
    enter image description here
  • PutHive3StreamingのfaiureとretryをLogAttributeに接続する
  • Zeppelinでクエリを実行して確認
    時間軸で温度の変化を表しています。
    enter image description here

6、NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する

CSV形式のデータ「1,2019-06-04,23:59:51,1011.54,24.86,50.67」から温度を取得して、閾値(ここでは30度)を超えたらSlackにアラート通知します。
全体のデータフロー
enter image description here

  • ExtractTextプロセッサを追加する
    Propertiesタブでfieldプロパティを追加、値を(.*),(.*),(.*),(.*),(.*),(.*)に設定
    image.png

Settingsタブでunmatchedをチェックする
enter image description here

  • RouteOnAttributeプロセッサを追加する
    Propertiesタブでhighプロパティを追加、値を${field.5:gt(40)} に設定する
    image.png

Settingsタブでunmatchedをチェックする
enter image description here

Settingsタブでsuccessをチェックする
enter image description here

  • PutSlackのfailureをLogAttributeに接続する
  • Slackメッセージ確認
    enter image description here

最後のメモ: 事前準備

# Kafka topic作成
cd /usr/hdp/current/kafka-broker
./kafka-topics.sh --create --zookeeper hdp-srv1.demotest.com:218,hdp-srv2.demotest.com:2181,hdp-srv3.demotest.com:2181 --replication-factor 3 --partitions 1 --topic sensor_data_user1

# HDFS folder作成
sudo su - hdfs 
hdfs dfs -mkdir -p /tmp/sensor_data/user1
hdfs dfs -chmod -R 777 /tmp/sensor_data/user1

# sensor_data_user1テーブルにNiFiユーザーにrwx権限追加
hdfs dfs -setfacl -m -R user:nifi:rwx /warehouse/tablespace/managed/hive/sensor_data_user1