「センサーをラズパイに接続し、MiNiFiでセンサーデータをTailする。そしてNiFiに転送して、NiFiでHDFS, Hiveに書き込む」の手順をご紹介します。
実現したいこと
- MiNiFiでセンサーデータを取得し、NiFiに転送する
- NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する
- NiFiでセンサーデータをHiveテーブルに保存する
- NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する
環境情報
- センサーをラズパイに接続
- ラズパイでセンサーデータを取るPythonスクリプト
- HDP3.1(Hadoop 3.1.1, Hive 3.1.0) & HDF 3.3.1(NiFi 1.8.0, Kafka 2.0.0)、クラスタは同一Ambariで管理
構成
構成は以下の様な感じになります。パース処理のところは一つしか書いてないですが、複数のパース処理の集合だと思ってください。
data:image/s3,"s3://crabby-images/d1b84/d1b84dc1e4d3ae69fd1cba3c803e92483f5c2250" alt="enter image description here"
全体データフロー
- RemoteMiNiFiというInput portとProcessDataというPrecess Groupで構成されています。
data:image/s3,"s3://crabby-images/d4e8c/d4e8cd803dacc455870fe52e3af6ab1e4d6efb1d" alt="enter image description here"
- ProcessDataグループ内の詳細フロー
data:image/s3,"s3://crabby-images/119e6/119e6c9ca6f69a5c17c2a6a0007f79052de30096" alt="enter image description here"
やってみよう
1、MiNiFiのセットアップ
ラズパイ自体のOSインストールや、センサーとの接続ができた状態(ここでは割愛します)でMiNiFiのインストールをやります。
MiNiFiのTarファイルはHortonworksサイトにあります。
https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/release-notes/content/hdf_repository_locations.html
sudo su -
cd /home/pi
#HortonworksのサイトからMiNiFiのtarファイルをダウンロードする
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-0.6.0.3.3.1.0-10-bin.tar.gz
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz
#ファイル解答し、シンボリックリンクを作成する
tar -zxvf minifi-0.6.0.3.3.1.0-10-bin.tar.gz
tar -zxvf minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz
ln -s minifi-0.6.0.3.3.1.0-10 minifi
ln -s minifi-toolkit-0.6.0.3.3.1.0-10 tool_minifi
#MiNiFiをインストールする
./minifi/bin/minifi.sh install
#必要に応じてリモートNiFiホストを/etc/hostsに追加する
2、センサーデータをPythonスクリプトで取得
Pythonスクリプトは/home/pi/bme280-dataフォルダに2019-06-04.csvのようなファイルが生成されます。
中身は:「センサーID, 日付, 時間, 気圧, 温度, 湿度」のカンマ区切りのデータになります。
1,2019-06-04,23:59:51,1011.54,24.86,50.67
#coding: utf-8
import bme280_custom
import datetime
import os
dir_path = '/home/pi/bme280-data’
now = datetime.datetime.now()
filename = now.strftime('%Y-%m-%d’)
label = now.strftime('%H:%M:%S’)
csv = bme280_custom.readData()
if not os.path.exists('/home/pi/bme280-data’):
os.makedirs('/home/pi/bme280-data’)
f = open('/home/pi/bme280-data/'+filename+'.csv','a’)
f.write('1,'+filename +","+label+","+csv+"\n")
f.close()
このスクリプトをCronとかで定期的に実行するように設定する(例えば10秒ごとに実行)
3、NiFiでデータフローを作成し、MiNiFiに配布
MiNiFiで直接データフローを作成するのが難しいので、NiFiで作成して、テンプレート(.xml)としてエクスポート、MiNiFi toolkitで.xmlを.ymlに変換するのが一般的です。
今回はTailFileプロセッサとRemoteProcessGroupを使います。
- NiFiのTop画面でInput portを追加する。名前をRemoteMiNiFiに設定する
data:image/s3,"s3://crabby-images/bf48f/bf48f75c3cc7a73b67112639d0cdb3065f10dcfc" alt="enter image description here"
- TailFileプロセッサでフォルダの.csvファイルを取り込みます。以下のプロパティを設定します。
Tailing mode: Multiple files
File(s) to Tail: .*.csv
Base Directory: /home/pi/bme280-data
data:image/s3,"s3://crabby-images/53c3d/53c3d064202a2a62f23c8fc9e1806789bd10bc6c" alt="enter image description here"
- Remote Process Groupを以下のように設定して、RemoteMiNiFiをInput Portとして選択する
URLsにNiFiのURL(http://hdp-srv4.demotest.com:9090/nifi
)を指定する。複数ある場合は、カンマ区切りで入力する。
Transport ProtocolでHTTPを選択する。
data:image/s3,"s3://crabby-images/8d600/8d6000af06f8dc10f9ad6cef67a8c5e16e54c729" alt="enter image description here"
- 作ったデータフローをテンプレートにエクスポートする
TailFileプロセッサとRemoteProcessGroupを繋いで、全部選択して、右クリック、「Create template」でテンプレートを作成する。
作成したら、ダウンロードする
data:image/s3,"s3://crabby-images/d9f2b/d9f2b6519b189a0c9407a038f7e38c74bddf05b9" alt="enter image description here"
- MiNiFiでテンプレートファイル(.xml)から.ymlファイルに変換する
ラズパイにログインして、以下のコマンドでファイルを変換して、MiNiFiを起動する
sudo su -
cd /home/pi
#MiNiFi toolkitでxmlからymlに変換する
./tool_minifi/bin/config.sh transform /home/pi/sensor_minifi4.xml ./sensor_minifi4.yml
#既存のconfig.ymlファイルをバックアップし、新しいymlで上書きする
cp -p minifi/conf/config.yml minifi/conf/config.yml.bk
cp -p sensor_minifi4.yml minifi/conf/config.yml
#MiNiFiプロセスを起動する。他にstop, restartなどオプションがある
./minifi/bin/minifi.sh start
ここまできたら、MiNiFiからセンサーデータをNiFiの方に転送できるようになります。
4、NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する
ここからは、NiFiでセンサーデータをKafka経由でHDFSに保存するデータフローを作成していきます。
全体のデータフローはこんな感じです。
data:image/s3,"s3://crabby-images/adc18/adc18eaf224fbc9c1dbe0376fb39a0a442c5931c" alt="enter image description here"
-
NiFi画面でUser1というProcess Groupをドラッグ&ドロップする。すでに作成済みのRemoteMiNiFi input portと繋ぐ
data:image/s3,"s3://crabby-images/df097/df0974942e320fa42c6d3cc81a0d712e22dbd5ef" alt="enter image description here"
User1グループをダブルクリックで入って後続のフロー作成に入ります。
-
FromMinifiというInput portとPublishKafka_2_0プロセッサを作成して、繋ぐ
data:image/s3,"s3://crabby-images/60932/60932019eb307d8ca812ccab3c840830b0ab3294" alt="enter image description here"
PublishKafka_2_0のPROPERTIESタブで必要なプロパティを設定する。
Kafka Brokers: hdp-srv1.demotest.com:6667,hdp-srv2.demotest.com:6667,hdp-srv3.demotest.com:6667
を入力
Topic Name: sensor_data_user1
Delivery Guarantee: Guarantee Replicated Delivery を選択
data:image/s3,"s3://crabby-images/c0478/c0478d2806a64a7d4fa5c556ef8a3b7861de5db9" alt="enter image description here"
PublishKafka_2_0のSETTINGSタブでfailureとsuccess両方をチェックする。
data:image/s3,"s3://crabby-images/5bf4e/5bf4eddf7e6f787af60472033b150cdbe1ac1f35" alt="enter image description here"
-
ConsumeKafka_2_0プロセッサを追加する
Kafka Brokers: hdp-srv1.demotest.com:6667,hdp-srv2.demotest.com:6667,hdp-srv3.demotest.com:6667
を入力
Topic Name: sensor_data_user1
Group ID: group1_user1
data:image/s3,"s3://crabby-images/9e444/9e4448de2ab0575344414f33c8ea350f4895ffe6" alt="enter image description here"
-
MergeContentプロセッサを追加する
Minimum Number of Entries: 10 に変更
data:image/s3,"s3://crabby-images/fee16/fee16588b399ab82ecff554d2be50d71cf465c75" alt="enter image description here"
-
PutHDFSプロセッサを追加する
Hadoop Configuration Resources: /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
Directory: /tmp/sensor_data/user1
data:image/s3,"s3://crabby-images/e2b7a/e2b7aaadf2f7d9bc1388a23194e35a6ce805285f" alt="enter image description here"
SETTINGSタブでsuccessをチェックする
data:image/s3,"s3://crabby-images/a5e66/a5e66c6ca73d875d71f6fb87de35cc33b1c12d32" alt="enter image description here"
-
最後にLogAttributeプロセッサを追加する
PROPERTIESタブは既定のままで、SETTINGSタブでsuccessをチェックする
data:image/s3,"s3://crabby-images/4bdd7/4bdd718b75563fadcab9412ba28bfc61a1aa9d4b" alt="enter image description here"
MergeContentのoriginal, failure, PutHDFSのfailureと繋ぐ
-
HDFS上のファイルの中身を見てみる
data:image/s3,"s3://crabby-images/dab5a/dab5a4fa53ddcd15ca7a2be016cf0d05ceafca43" alt="enter image description here"
5、NiFiでセンサーデータをHiveテーブルに保存する
ここでHive Streamingを使ってセンサーデータをリアルタイムにHiveテーブルに追加します。
これを実現するには、Hiveテーブルがいくつか要件を満たす必要があります。
詳細はこちらをご参照くださいStreamingDataIngest-StreamingRequirements
1,ACIDサポートのため、hive-site.xmlに以下3つのパラメータを設定(HDP3.1ではすでに設定ずみ)
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on = true
hive.compactor.worker.threads > 0
2,テーブル作成時に以下を含むことが必要
STORED AS ORC
tblproperties("transactional"="true")
CLUSTERED BY (cloumn-name) INTO <num> BUCKETS
下記の通りでテーブル(sensor_data_user1)を作成します。
CREATE TABLE sensor_data_user1(id int,time_str string, pressure double, temperature double, humidity double)
PARTITIONED BY(date_str string)
CLUSTERED BY (id) INTO 5 BUCKETS
STORED AS ORC
tblproperties("transactional" = "true");
この部分のデータフローは図の通りになります。
data:image/s3,"s3://crabby-images/21aee/21aeef0cfb6bd842203d1cd21d4918d17db5a0a3" alt="enter image description here"
これからプロセッサを説明していきます。
- UpdateAttributeプロセッサを追加し、FromMinifi input portと接続する
data:image/s3,"s3://crabby-images/f579e/f579e0292cc68f01fc03e899b3e235b5005ab1bf" alt="enter image description here"
- UpdateAttributeのPROPERTIESタブでschema.name=sensor_data_schema1を追加する
data:image/s3,"s3://crabby-images/37a29/37a295e813eff124b5bea3da3bb44662329b8064" alt="enter image description here"
- ConvertRecordプロセッサを追加する。CSVをAvroに変換
Record Reader: CSVReaderを選択
Record Writer: AvroRecordSetWriterを選択
data:image/s3,"s3://crabby-images/64ff4/64ff4f83f327f38ceb645663168f9949460ef914" alt="enter image description here"
CSVReaderの→をクリックする。
Controller ServicesタブでAvroRecordSetWriterとCSVReaderが追加される。
data:image/s3,"s3://crabby-images/ad455/ad455c27ced8a77ca74f04383c3b3f00b4f296d1" alt="enter image description here"
⚙をクリックする。
Schema Access Strategy: Use ‘Schema Name’ Propertyを選択
Schema Registryで Create new service… を選択する
data:image/s3,"s3://crabby-images/01ee2/01ee2e5dbc2dac9bd31dd2be134983bb3f039b40" alt="enter image description here"
AvroSchemaRegistry… を選択してCreateをクリックする
data:image/s3,"s3://crabby-images/7c6a0/7c6a063fb6a4dc9b0101a127a71502fb11674b12" alt="enter image description here"
Schema Registryで AvroSchemaRegistryが表示される。→をクリックする。Save changes before going to Controller Service?が表示され、Yesクリックする
data:image/s3,"s3://crabby-images/25349/25349f4d19c7d267689bf9b0e4ccaad62afd9eb4" alt="enter image description here"
Controller ServicesタブでAvoSchemaRegistryが追加される
⚙をクリックする
data:image/s3,"s3://crabby-images/3008d/3008d8315498433d0fbbb22b05473c0cdc69c1b3" alt="enter image description here"
Propertiesタブでプロパティ sensor_data_schema1を追加する
data:image/s3,"s3://crabby-images/a4105/a41054e4724a29314072c4a4768456b6b5c11ba0" alt="image.png"
Avroスキーマは下記の通り
{
"type": "record",
"namespace": "sensor_data_schema1",
"name": "sensor_data_schema1",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "date_str", "type": "string" },
{ "name": "time_str", "type": "string" },
{ "name": "pressure", "type": "double" },
{ "name": "temperature", "type": "double" },
{ "name": "humidity", "type": "double" }
]
}
終わったら、CSVReaderがInvalid、AvroSchemaRegistryがDisabledの状態。
赤枠のアイコンをクリックしてAvroSchemaRegistryを有効にする(Enable→Closeをクリックする)。
CSVReaderも有効にする
data:image/s3,"s3://crabby-images/5f0ed/5f0edfed19e275816a5674aa86973631c7d03f6c" alt="enter image description here"
最後にAvroRecordSetWriterも既定のままで有効にする
data:image/s3,"s3://crabby-images/e5b68/e5b684b04f15ba4281b0428f90369c7df8fb0977" alt="enter image description here"
ConvertRecordのfailureをLogAttributeに繋ぐ
- PutHive3Streamingプロセッサを追加する
data:image/s3,"s3://crabby-images/358e5/358e5ab4ffdef06b74db133de5332f0cfa959573" alt="enter image description here"
PutHive3StreamingのPropertiesタブで以下の値を設定する
Record Reader: AvroReaderを選択
Hive Metastore URL: thrift://hdp-srv3.demotest.com:9083を入力
Hive Configuration Resources: /etc/hive/conf/hive-site.xml,/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xmlを入力
Database Name: defaultを入力
Table Name: sensor_data_user1を入力
data:image/s3,"s3://crabby-images/33541/33541b189bc2f9910c4a9235cdc7e2566f6b9ff3" alt="enter image description here"
Record Readerの→をクリックし、AvroReaderを有効にする(Enable→Close)
data:image/s3,"s3://crabby-images/8c296/8c296e8a8763da989fccf0be85a96c2e51b294c9" alt="enter image description here"
PutHive3StreamingのSettingsタブでsuccessをチェックする
data:image/s3,"s3://crabby-images/a6523/a65239ff4d3b3d0fdd6e801473742ebbc10917f9" alt="enter image description here"
- PutHive3StreamingのfaiureとretryをLogAttributeに接続する
- Zeppelinでクエリを実行して確認
時間軸で温度の変化を表しています。
data:image/s3,"s3://crabby-images/d6bca/d6bca35a19a971fd51afb6c9bcb2d64c105ea512" alt="enter image description here"
6、NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する
CSV形式のデータ「1,2019-06-04,23:59:51,1011.54,24.86,50.67」から温度を取得して、閾値(ここでは30度)を超えたらSlackにアラート通知します。
全体のデータフロー
data:image/s3,"s3://crabby-images/5b24c/5b24c8f783b5abd84a87ec1553f23f1ffac11a8f" alt="enter image description here"
- ExtractTextプロセッサを追加する
Propertiesタブでfieldプロパティを追加、値を(.*),(.*),(.*),(.*),(.*),(.*)に設定
data:image/s3,"s3://crabby-images/1dc41/1dc41404826e026fadaef0a87c334dfcb3c75ccf" alt="image.png"
Settingsタブでunmatchedをチェックする
data:image/s3,"s3://crabby-images/ee8c4/ee8c46aca7a4e09247a2350e872ee4e27e590806" alt="enter image description here"
- RouteOnAttributeプロセッサを追加する
Propertiesタブでhighプロパティを追加、値を${field.5:gt(40)} に設定する
data:image/s3,"s3://crabby-images/6ec5c/6ec5cf4044ff1b7968ddba6a7f200c25f4ddcda0" alt="image.png"
Settingsタブでunmatchedをチェックする
data:image/s3,"s3://crabby-images/bddd6/bddd605f24a6da076bb190d0b538f018ef8d9d4f" alt="enter image description here"
Settingsタブでsuccessをチェックする
data:image/s3,"s3://crabby-images/f059e/f059ec2f50c67389fa2b4ebe5433c5de81ca59d9" alt="enter image description here"
- PutSlackのfailureをLogAttributeに接続する
- Slackメッセージ確認
data:image/s3,"s3://crabby-images/3b985/3b9850abdb7f8492efe56e82983d4a934a2eacf4" alt="enter image description here"
最後のメモ: 事前準備
# Kafka topic作成
cd /usr/hdp/current/kafka-broker
./kafka-topics.sh --create --zookeeper hdp-srv1.demotest.com:218,hdp-srv2.demotest.com:2181,hdp-srv3.demotest.com:2181 --replication-factor 3 --partitions 1 --topic sensor_data_user1
# HDFS folder作成
sudo su - hdfs
hdfs dfs -mkdir -p /tmp/sensor_data/user1
hdfs dfs -chmod -R 777 /tmp/sensor_data/user1
# sensor_data_user1テーブルにNiFiユーザーにrwx権限追加
hdfs dfs -setfacl -m -R user:nifi:rwx /warehouse/tablespace/managed/hive/sensor_data_user1