2019年6月11日火曜日

NiFi ReplaceTextプロセッサのReplacement Valueの書き方

  • このエントリーをはてなブックマークに追加


NiFi ReplaceTextプロセッサのReplacement Valueの書き方

NiFiで流れてくるFlowfileの中身をReplace(頭何かデータを入れたい、後ろにデータをつけたいとか)したい場合は、ReplaceTextプロセッサを使えます。

ReplaceText既定の画面
image.png

一個例を見てみましょう
例えば流れてくるFlowfileは一行のCSV

aaa,bbb,ccc,ddd

後ろにeeeをつけたい場合、以下のように書きます。

$1,eee

FlowfileのAttribute(filenameとか)を後ろにつけたい場合
$1をシングルクォーテーションで囲む必要です。

${'$1'},${filename}

$1のdddをfffにreplaceして、eeeを追加したい場合

${'$1':replace('ddd','fff')},eee

一番重要なのは$1をシングルクォーテーションで囲むところかなぁ。囲まないと怒られます。

2019年6月10日月曜日

FreeIPAとAmbari連携

  • このエントリーをはてなブックマークに追加


FreeIPAとAmbari連携

FreeIPAとAmbariの連携方法をメモしておきます。
HDP3.1.0、Ambari 2.7.3の環境です。

FreeIPAのインストール

以下のスクリプトでサーバー上にインストールし、いくつかUser、Groupを作成する。
必要に応じて変更してください。

#ホスト名
export NAME=ipa-srv1
#Domain名
export DOMAIN=demotest.com
export REALM=$(echo ${DOMAIN} | awk '{print toupper($0)}')
export IP=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/')

echo "IP is ${IP}"

echo "${IP} ${NAME}.${DOMAIN} ${NAME} $(hostname -f) $(hostname -s)" >> /etc/hosts
hostnamectl set-hostname ${NAME}.${DOMAIN}
hostnamectl --transient set-hostname ${NAME}
hostname ${NAME}.${DOMAIN}
echo 0 > /proc/sys/kernel/hung_task_timeout_secs
ethtool -K eth0 tso off


hostname -f
cat /etc/hosts

#install packages
sudo yum install ipa-server ipa-server-dns -y

#increase entropy
cat /proc/sys/kernel/random/entropy_avail
sudo yum install -y rng-tools
sudo systemctl start rngd
cat /proc/sys/kernel/random/entropy_avail

#sometimes needed to avoid server install failing
sudo service dbus restart

#install IPA server
ipa-server-install \
--realm ${REALM} --domain ${DOMAIN} \
-a BadPass#1 -p BadPass#1 \
--setup-dns \
--forwarder=8.8.8.8 --allow-zone-overlap --no-host-dns \
--auto-forwarders --auto-reverse --unattended

#kinit as admin
#パスワードをBadPass#1に設定する
echo BadPass#1 | kinit admin

# create a new principal to be used for ambari kerberos administration
ipa user-add hadoopadmin --first=Hadoop --last=Admin --shell=/bin/bash

# create a new principal to be used for read only ldab bind (whose password will expire in 90 days)
ipa user-add ldapbind --first=ldap --last=bind

# create a role and and give it privilege to manage users and services
ipa role-add hadoopadminrole
ipa role-add-privilege hadoopadminrole --privileges="User Administrators"
ipa role-add-privilege hadoopadminrole --privileges="Service Administrators"

#add the hadoopadmin user to the role
ipa role-add-member hadoopadminrole --users=hadoopadmin

#create users/groups
ipa group-add analyst --desc analyst
ipa group-add hr --desc hr
ipa group-add legal --desc legal
ipa group-add sales --desc sales
ipa group-add etl --desc etl
ipa group-add us_employee --desc us_employee
ipa group-add eu_employee --desc eu_employee
ipa group-add intern --desc intern
ipa group-add sudoers --desc sudoers

ipa user-add legal1 --first=legal1 --last=legal1 --shell=/bin/bash
ipa user-add legal2 --first=legal2 --last=legal2 --shell=/bin/bash
ipa user-add legal3 --first=legal3 --last=legal3 --shell=/bin/bash
ipa user-add hr1 --first=hr1 --last=hr1 --shell=/bin/bash
ipa user-add hr2 --first=hr2 --last=hr2 --shell=/bin/bash
ipa user-add hr3 --first=hr3 --last=hr3 --shell=/bin/bash
ipa user-add sales1 --first=sales1 --last=sales1 --shell=/bin/bash
ipa user-add sales2 --first=sales2 --last=sales2 --shell=/bin/bash
ipa user-add sales3 --first=sales3 --last=sales3 --shell=/bin/bash
ipa user-add joe_analyst --first=joe --last=analyst --shell=/bin/bash
ipa user-add ivanna_eu_hr --first=ivanna --last=hr --shell=/bin/bash
ipa user-add scott_intern --first=scott --last=intern --shell=/bin/bash
ipa user-add noobie --first=jon --last=snow --shell=/bin/bash

ipa group-add-member legal --users=legal1
ipa group-add-member legal --users=legal2
ipa group-add-member legal --users=legal3

ipa group-add-member hr --users=hr1
ipa group-add-member hr --users=hr2
ipa group-add-member hr --users=hr3
ipa group-add-member hr --users=ivanna_eu_hr

ipa group-add-member sales --users=sales1
ipa group-add-member sales --users=sales2
ipa group-add-member sales --users=sales3

ipa group-add-member analyst --users=joe_analyst
ipa group-add-member intern --users=scott_intern
ipa group-add-member us_employee --users=joe_analyst
ipa group-add-member eu_employee --users=ivanna_eu_hr

# create sudo rule
ipa sudorule-add admin_all_rule
ipa sudorule-mod admin_all_rule --cmdcat=all --hostcat=all
ipa sudorule-add-user admin_all_rule --groups=sudoers

# add noobie to the sudoers user, to enable sudo rules
ipa group-add-member sudoers --users=noobie


echo BadPass#1 > tmp.txt
echo BadPass#1 >> tmp.txt


ipa passwd hadoopadmin < tmp.txt
ipa passwd ldapbind < tmp.txt

ipa passwd legal1 < tmp.txt
ipa passwd legal2 < tmp.txt
ipa passwd legal3 < tmp.txt
ipa passwd hr1 < tmp.txt
ipa passwd hr2 < tmp.txt
ipa passwd hr3 < tmp.txt
ipa passwd sales1 < tmp.txt
ipa passwd sales2 < tmp.txt
ipa passwd sales3 < tmp.txt

ipa passwd joe_analyst < tmp.txt
ipa passwd ivanna_eu_hr < tmp.txt
ipa passwd scott_intern < tmp.txt

rm -f tmp.txt

FreeIPAの管理画面

管理画面でUser,Group,Password Policy等情報を確認設定できます。
image.png

AmbariのLDAP設定

ambari-server setup-ldap

Using python  /usr/bin/python
Currently 'no auth method' is configured, do you wish to use LDAP instead [y/n] (y)? yes
Enter Ambari Admin login: admin
Enter Ambari Admin password:

Fetching LDAP configuration from DB. No configuration.
Please select the type of LDAP you want to use [AD/IPA/Generic](Generic):IPA
Primary LDAP Host (ipa.ambari.apache.org): ipa-srv1.demotest.com
Primary LDAP Port (636): 389
Secondary LDAP Host <Optional>:
Secondary LDAP Port <Optional>:
Use SSL [true/false] (true): false
User object class (posixAccount): person
User ID attribute (uid): uid
Group object class (posixGroup): groupofnames
Group name attribute (cn): cn
Group member attribute (member): member
Distinguished name attribute (dn): dn
Search Base (cn=accounts,dc=ambari,dc=apache,dc=org): dc=demotest,dc=com
Referral method [follow/ignore] (follow):follow
Bind anonymously [true/false] (false): false
Bind DN (uid=ldapbind,cn=users,cn=accounts,dc=ambari,dc=apache,dc=org): uid=ldapbind,cn=users,cn=accounts,dc=demotest,dc=com
Enter Bind DN Password:
Confirm Bind DN Password:
Handling behavior for username collisions [convert/skip] for LDAP sync (skip): convert
Force lower-case user names [true/false]:false
Results from LDAP are paginated when requested [true/false]:false
====================
Review Settings
====================
Primary LDAP Host (ipa.ambari.apache.org):  ipa-srv1.demotest.com
Primary LDAP Port (636):  389
Use SSL [true/false] (true):  false
User object class (posixAccount):  person
User ID attribute (uid):  uid
Group object class (posixGroup):  groupofnames
Group name attribute (cn):  cn
Group member attribute (member):  member
Distinguished name attribute (dn):  dn
Search Base (cn=accounts,dc=ambari,dc=apache,dc=org):  dc=demotest,dc=com
Referral method [follow/ignore] (follow):  follow
Bind anonymously [true/false] (false):  false
Handling behavior for username collisions [convert/skip] for LDAP sync (skip):  convert
Force lower-case user names [true/false]: false
Results from LDAP are paginated when requested [true/false]: false
ambari.ldap.connectivity.bind_dn: uid=ldapbind,cn=users,cn=accounts,dc=demotest,dc=com
ambari.ldap.connectivity.bind_password: *****
Save settings [y/n] (y)? y
Saving LDAP properties...
Saving LDAP properties finished
Ambari Server 'setup-ldap' completed successfully.

AmbariからLDAP同期

Ambariの既定のID/password(admin/admin)を使用


ambari-server sync-ldap --ldap-sync-admin-name=admin --ldap-sync-admin-password=admin --all
Using python  /usr/bin/python
Syncing with LDAP...

Fetching LDAP configuration from DB.
Syncing all...

Completed LDAP Sync.
Summary:
  memberships:
    removed = 0
    created = 33
  users:
    skipped = 0
    removed = 0
    updated = 1
    created = 15
  groups:
    updated = 0
    removed = 0
    created = 292

Ambari Server 'sync-ldap' completed successfully.

Ambariにログインして(admin/admin)、特定のユーザーにambariの管理権限を付与する

ここではhadoopadminというユーザーにambari adminに設定する
NoからYesに変更する
image.png

image.png

hadoopadminユーザーでAmbariにログインする

hadoopadminでログインできていることを確認
image.png

2019年6月8日土曜日

AmbariのMetrics monitor起動 due to AttributeError: 'module' object has no attribute 'boot_time'n

  • このエントリーをはてなブックマークに追加


AmbariのMetrics monitor起動 due to AttributeError: 'module' object has no attribute 'boot_time'n
以下のエラーでAmbariのMetrics monitor起動に失敗することがあります。
Traceback (most recent call last):
  File "/var/lib/ambari-agent/cache/stacks/HDP/3.0/services/AMBARI_METRICS/package/scripts/metrics_monitor.py", line 78, in 
    AmsMonitor().execute()
  File "/usr/lib/ambari-agent/lib/resource_management/libraries/script/script.py", line 352, in execute
    method(env)
  File "/var/lib/ambari-agent/cache/stacks/HDP/3.0/services/AMBARI_METRICS/package/scripts/metrics_monitor.py", line 43, in start
    action = 'start'
  File "/usr/lib/ambari-agent/lib/ambari_commons/os_family_impl.py", line 89, in thunk
    return fn(*args, **kwargs)
  File "/var/lib/ambari-agent/cache/stacks/HDP/3.0/services/AMBARI_METRICS/package/scripts/ams_service.py", line 109, in ams_service
    user=params.ams_user
  File "/usr/lib/ambari-agent/lib/resource_management/core/base.py", line 166, in __init__
    self.env.run()
  File "/usr/lib/ambari-agent/lib/resource_management/core/environment.py", line 160, in run
    self.run_action(resource, action)
  File "/usr/lib/ambari-agent/lib/resource_management/core/environment.py", line 124, in run_action
    provider_action()
  File "/usr/lib/ambari-agent/lib/resource_management/core/providers/system.py", line 263, in action_run
    returns=self.resource.returns)
  File "/usr/lib/ambari-agent/lib/resource_management/core/shell.py", line 72, in inner
    result = function(command, **kwargs)
  File "/usr/lib/ambari-agent/lib/resource_management/core/shell.py", line 102, in checked_call
    tries=tries, try_sleep=try_sleep, timeout_kill_strategy=timeout_kill_strategy, returns=returns)
  File "/usr/lib/ambari-agent/lib/resource_management/core/shell.py", line 150, in _call_wrapper
    result = _call(command, **kwargs_copy)
  File "/usr/lib/ambari-agent/lib/resource_management/core/shell.py", line 314, in _call
    raise ExecutionFailed(err_msg, code, out, err)
resource_management.core.exceptions.ExecutionFailed: Execution of '/usr/sbin/ambari-metrics-monitor --config /etc/ambari-metrics-monitor/conf start' returned 255. psutil build directory is not empty, continuing...
Verifying Python version compatibility...
Using python  /usr/bin/python2.7
Checking for previously running Metric Monitor...
/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid found with no process. Removing 5237...
Starting ambari-metrics-monitor
Verifying ambari-metrics-monitor process status with PID : 11790
Output of PID check : 
ERROR: ambari-metrics-monitor start failed. For more details, see /var/log/ambari-metrics-monitor/ambari-metrics-monitor.out:
====================
    server_process_main(stop_handler)
  File "/usr/lib/python2.6/site-packages/resource_monitoring/main.py", line 64, in server_process_main
    controller = Controller(main_config, stop_handler)
  File "/usr/lib/python2.6/site-packages/resource_monitoring/core/controller.py", line 46, in __init__
    hostinfo = HostInfo(config)
  File "/usr/lib/python2.6/site-packages/resource_monitoring/core/host_info.py", line 49, in __init__
    self.__host_static_info = self.get_host_static_info()
  File "/usr/lib/python2.6/site-packages/resource_monitoring/core/host_info.py", line 235, in get_host_static_info
    boot_time = psutil.boot_time()
AttributeError: 'module' object has no attribute 'boot_time'
====================
Monitor out at: /var/log/ambari-metrics-monitor/ambari-metrics-monitor.out

解決策

以下を実行する
cd /usr/lib/python2.6/site-packages/resource_monitoring/psutil
make install ambari-metrics-monitor restart

2019年6月6日木曜日

MiNiFiでセンサーデータを取得し、NiFiに転送してHDFS、Hiveに書き込む

  • このエントリーをはてなブックマークに追加


MiNiFiでセンサーデータを取得し、NiFiに転送してHDFS、Hiveに書き込む
「センサーをラズパイに接続し、MiNiFiでセンサーデータをTailする。そしてNiFiに転送して、NiFiでHDFS, Hiveに書き込む」の手順をご紹介します。

実現したいこと

  • MiNiFiでセンサーデータを取得し、NiFiに転送する
  • NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する
  • NiFiでセンサーデータをHiveテーブルに保存する
  • NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する

環境情報

  • センサーをラズパイに接続
  • ラズパイでセンサーデータを取るPythonスクリプト
  • HDP3.1(Hadoop 3.1.1, Hive 3.1.0) & HDF 3.3.1(NiFi 1.8.0, Kafka 2.0.0)、クラスタは同一Ambariで管理

構成

構成は以下の様な感じになります。パース処理のところは一つしか書いてないですが、複数のパース処理の集合だと思ってください。
enter image description here

全体データフロー

  • RemoteMiNiFiというInput portとProcessDataというPrecess Groupで構成されています。
    enter image description here
  • ProcessDataグループ内の詳細フロー
    enter image description here

やってみよう

1、MiNiFiのセットアップ

ラズパイ自体のOSインストールや、センサーとの接続ができた状態(ここでは割愛します)でMiNiFiのインストールをやります。
MiNiFiのTarファイルはHortonworksサイトにあります。
https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/release-notes/content/hdf_repository_locations.html
sudo su -
cd /home/pi

#HortonworksのサイトからMiNiFiのtarファイルをダウンロードする
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-0.6.0.3.3.1.0-10-bin.tar.gz
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz

#ファイル解答し、シンボリックリンクを作成する
tar -zxvf minifi-0.6.0.3.3.1.0-10-bin.tar.gz
tar -zxvf minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz
ln -s minifi-0.6.0.3.3.1.0-10 minifi
ln -s minifi-toolkit-0.6.0.3.3.1.0-10 tool_minifi

#MiNiFiをインストールする
./minifi/bin/minifi.sh install
#必要に応じてリモートNiFiホストを/etc/hostsに追加する

2、センサーデータをPythonスクリプトで取得

Pythonスクリプトは/home/pi/bme280-dataフォルダに2019-06-04.csvのようなファイルが生成されます。
中身は:「日付, 時間, 気圧, 温度, 湿度」のカンマ区切りのデータになります。
2019-06-04,23:59:51,1011.54,24.86,50.67
#coding: utf-8
import bme280_custom
import datetime
import os

dir_path = '/home/pi/bme280-data’
now = datetime.datetime.now()
filename = now.strftime('%Y-%m-%d’)
label = now.strftime('%H:%M:%S’)
csv = bme280_custom.readData()
if not os.path.exists('/home/pi/bme280-data’):
  os.makedirs('/home/pi/bme280-data’)
f = open('/home/pi/bme280-data/'+filename+'.csv','a’)
f.write(filename +","+label+","+csv+"\n")
f.close()
このスクリプトをCronとかで定期的に実行するように設定する(例えば10秒ごとに実行)

3、NiFiでデータフローを作成し、MiNiFiに配布

MiNiFiで直接データフローを作成するのが難しいので、NiFiで作成して、テンプレート(.xml)としてエクスポート、MiNiFi toolkitで.xmlを.ymlに変換するのが一般的です。
今回はTailFileプロセッサとRemoteProcessGroupを使います。
  • NiFiのTop画面でInput portを追加する。名前をRemoteMiNiFiに設定する
    enter image description here
  • TailFileプロセッサでフォルダの.csvファイルを取り込みます。以下のプロパティを設定します。
    Tailing mode:  Multiple files
    File(s) to Tail:  .*.csv
    Base Directory: /home/pi/bme280-data
    enter image description here
  • Remote Process Groupを以下のように設定して、RemoteMiNiFiをInput Portとして選択する
    URLsにNiFiのURL(http://hdp-srv4.demotest.com:9090/nifi)を指定する。複数ある場合は、カンマ区切りで入力する。
    Transport ProtocolでHTTPを選択する。
    enter image description here
  • 作ったデータフローをテンプレートにエクスポートする
    TailFileプロセッサとRemoteProcessGroupを繋いで、全部選択して、右クリック、「Create template」でテンプレートを作成する。
    作成したら、ダウンロードする
    enter image description here
  • MiNiFiでテンプレートファイル(.xml)から.ymlファイルに変換する
    ラズパイにログインして、以下のコマンドでファイルを変換して、MiNiFiを起動する
    sudo su -
    cd /home/pi
    
    #MiNiFi toolkitでxmlからymlに変換する
    ./tool_minifi/bin/config.sh transform /home/pi/sensor_minifi4.xml ./sensor_minifi4.yml
    
    #既存のconfig.ymlファイルをバックアップし、新しいymlで上書きする
    cp -p minifi/conf/config.yml minifi/conf/config.yml.bk
    cp -p sensor_minifi4.yml minifi/conf/config.yml
    
    #MiNiFiプロセスを起動する。他にstop, restartなどオプションがある
    ./minifi/bin/minifi.sh start
    
ここまできたら、MiNiFiからセンサーデータをNiFiの方に転送できるようになります。

4、NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する

ここからは、NiFiでセンサーデータをKafka経由でHDFSに保存するデータフローを作成していきます。
全体のデータフローはこんな感じです。
enter image description here
  • NiFi画面でUser1というProcess Groupをドラッグ&ドロップする。すでに作成済みのRemoteMiNiFi input portと繋ぐ
    enter image description here
    User1グループをダブルクリックで入って後続のフロー作成に入ります。
  • FromMinifiというInput portとPublishKafka_2_0プロセッサを作成して、繋ぐ
    enter image description here
    PublishKafka_2_0のPROPERTIESタブで必要なプロパティを設定する。
    Kafka Brokers: hdp-srv1.demotest.com:6667,hdp-srv2.demotest.com:6667,hdp-srv3.demotest.com:6667 を入力
    Topic Name: sensor_data_user1
    Delivery Guarantee: Guarantee Replicated Delivery を選択
    enter image description here
    PublishKafka_2_0のSETTINGSタブでfailureとsuccess両方をチェックする。
    enter image description here
  • ConsumeKafka_2_0プロセッサを追加する
    Kafka Brokers: hdp-srv1.demotest.com:6667,hdp-srv2.demotest.com:6667,hdp-srv3.demotest.com:6667 を入力
    Topic Name: sensor_data_user1
    Group ID: group1_user1
    enter image description here
  • MergeContentプロセッサを追加する
    Minimum Number of Entries: 10 に変更
    enter image description here
  • PutHDFSプロセッサを追加する
    Hadoop Configuration Resources: /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
    Directory: /tmp/sensor_data/user1
    enter image description here
    SETTINGSタブでsuccessをチェックする
    enter image description here
  • 最後にLogAttributeプロセッサを追加する
    PROPERTIESタブは既定のままで、SETTINGSタブでsuccessをチェックする
    enter image description here
    MergeContentのoriginal, failure, PutHDFSのfailureと繋ぐ
  • HDFS上のファイルの中身を見てみる
    enter image description here

5、NiFiでセンサーデータをHiveテーブルに保存する

ここでHive Streamingを使ってセンサーデータをリアルタイムにHiveテーブルに追加します。
これを実現するには、Hiveテーブルがいくつか要件を満たす必要があります。
詳細はこちらをご参照くださいStreamingDataIngest-StreamingRequirements
1,ACIDサポートのため、hive-site.xmlに以下3つのパラメータを設定(HDP3.1ではすでに設定ずみ)
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 
hive.compactor.initiator.on = true 
hive.compactor.worker.threads > 0

2,テーブル作成時に以下を含むことが必要
STORED AS ORC
tblproperties("transactional"="true")
CLUSTERED BY (cloumn-name) INTO  BUCKETS
下記の通りでテーブル(sensor_data_user1)を作成します。
CREATE TABLE sensor_data_user1(time_str string, pressure double, temperature double, humidity double)
PARTITIONED BY(date_str string) 
CLUSTERED BY (time_str) INTO 5 BUCKETS 
STORED AS ORC 
tblproperties("transactional" = "true");
この部分のデータフローは図の通りになります。
enter image description here
これからプロセッサを説明していきます。
  • UpdateAttributeプロセッサを追加し、FromMinifi input portと接続する
    enter image description here
  • UpdateAttributeのPROPERTIESタブでschema.name=sensor_data_schema1を追加する
    enter image description here
  • ConvertRecordプロセッサを追加する。CSVをAvroに変換
    Record Reader: CSVReaderを選択
    Record Writer: AvroRecordSetWriterを選択
    enter image description here
    CSVReaderの→をクリックする。
    Controller ServicesタブでAvroRecordSetWriterとCSVReaderが追加される。
    enter image description here
    ⚙をクリックする。
    Schema Access Strategy: Use ‘Schema Name’ Propertyを選択
    Schema Registryで**Create new service…**を選択する
    enter image description here
    **AvroSchemaRegistry…**を選択してCreateをクリックする
    enter image description here
    Schema Registryで AvroSchemaRegistryが表示される。→をクリックする。Save changes before going to Controller Service?が表示され、Yesクリックする
    enter image description here
    Controller ServicesタブでAvoSchemaRegistryが追加される
    ⚙をクリックする
    enter image description here
    Propertiesタブでプロパティ sensor_data_schema1を追加する
    enter image description here
    Avroスキーマは下記の通り
    {
         "type": "record",
         "namespace": "sensor_data_schema1",
         "name": "sensor_data_schema1",
         "fields": [
           { "name": "date_str", "type": "string" },
           { "name": "time_str", "type": "string" },
        { "name": "pressure", "type": "double" },
        { "name": "temperature", "type": "double" },
           { "name": "humidity", "type": "double" }
         ]
    } 
    
    終わったら、CSVReaderがInvalid、AvroSchemaRegistryがDisabledの状態。
    赤枠のアイコンをクリックしてAvroSchemaRegistryを有効にする(Enable→Closeをクリックする)。
    CSVReaderも有効にする
    enter image description here
    最後にAvroRecordSetWriterも既定のままで有効にする
    enter image description here
    ConvertRecordのfailureをLogAttributeに繋ぐ
  • PutHive3Streamingプロセッサを追加する
    enter image description here
    PutHive3StreamingのPropertiesタブで以下の値を設定する
    Record Reader: AvroReaderを選択
    Hive Metastore URL: thrift://hdp-srv3.demotest.com:9083を入力
    Hive Configuration Resources: /etc/hive/conf/hive-site.xml,/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xmlを入力
    Database Name: defaultを入力
    Table Name: sensor_data_user1を入力
    enter image description here
    Record Readerの→をクリックし、AvroReaderを有効にする(Enable→Close)
    enter image description here
    PutHive3StreamingのSettingsタブでsuccessをチェックする
    enter image description here
  • PutHive3StreamingのfaiureとretryをLogAttributeに接続する
  • Zeppelinでクエリを実行して確認
    時間軸で温度の変化を表しています。
    enter image description here

6、NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する

CSV形式のデータ「2019-06-04,23:59:51,1011.54,24.86,50.67」から温度を取得して、閾値(ここでは30度)を超えたらSlackにアラート通知します。
全体のデータフロー
enter image description here
  • ExtractTextプロセッサを追加する
    Propertiesタブでfieldプロパティを追加、値を(.*),(.*),(.*),(.*),(.*)に設定
    enter image description here
    Settingsタブでunmatchedをチェックする
    enter image description here
  • RouteOnAttributeプロセッサを追加する
    Propertiesタブでhighプロパティを追加、値を${field.4:gt(30)} に設定する
    enter image description here
    Settingsタブでunmatchedをチェックする
    enter image description here
  • PutSlackプロセッサを追加する
    Propertiesタブで以下のプロパティを設定する
    Webhook URL: https://hooks.slack.com/services/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
    Webhook Text: Temperature is ${field.4}! Too hort!!

    Settingsタブでsuccessをチェックする
    enter image description here
  • PutSlackのfailureをLogAttributeに接続する
  • Slackメッセージ確認(テストのため一時的に閾値を20度にしました)
    enter image description here

最後のメモ。事前準備

# Kafka topic作成
cd /usr/hdp/current/kafka-broker
./kafka-topics.sh --create --zookeeper hdp-srv1.demotest.com:218,hdp-srv2.demotest.com:2181,hdp-srv3.demotest.com:2181 --replication-factor 3 --partitions 1 --topic sensor_data_user1

# HDFS folder作成
sudo su - hdfs 
hdfs dfs -mkdir -p /tmp/sensor_data/user1
hdfs dfs -chmod -R 777 /tmp/sensor_data/user1

# sensor_data_user1テーブルにNiFiユーザーにrwx権限追加
hdfs dfs -setfacl -m -R user:nifi:rwx /warehouse/tablespace/managed/hive/sensor_data_user1

2019年5月9日木曜日

Spark MLで作ったモデルをどうやってStormやFlinkで使うか

  • このエントリーをはてなブックマークに追加


Spark MLでトレーニングしたモデルをStormやFlinkで使う方法をメモしておきます。

Storm
1,PMMLを使う
ただ、この場合はFeature scalingや特徴量変換処理を事前にやる必要があるそうです。
そのかわりに、JPMMLを使う

2, それでもう一つ方法はStormのコードにSparkのMLlib Jarをインポートして、そのままSpark MLlibのクラスでSpark Modelを呼び出すがあります。

Flink

JPMML(on top of PMML)がよく動く!

MLeapは非現実的?

PFAが有望的!

Flink model serving site






Evernote はあなたがすべてを記憶し、手間をかけずに整理できるようにお手伝いします。Evernote をダウンロードする

Ambari local login

  • このエントリーをはてなブックマークに追加


Knox経由でAmbariにログインするときに、常にKnoxの画面が表示されます。
Knoxに何か問題があったら、Ambariにログインできなくなります。
でもAmbariにログインしたいですよね。その場合は、以下のようなURLでKnox経由しなくて、従来のAmbariにログインできます。

URLの後ろにlocalをつけます。

Evernote はあなたがすべてを記憶し、手間をかけずに整理できるようにお手伝いします。Evernote をダウンロードする

2019年4月15日月曜日

Apache Arrowメモ

  • このエントリーをはてなブックマークに追加


Arrowが目指していること
メモリ上のデータを効率化することです。

向いていること
大量データの交換
メモリ上のでの大量データの分析処理


速度
シリアライズ、デシリアライズコストが小さいデータフォーマット。各種システムで共通で使う必要があり、各種言語のライブラリも開発中

並列処理
できるだけデータを局所化する配置になっている
OLTPよりOLAPに向いています。
OLTPはECサイトのように行単位の追加、更新、削除処理が多い、RDB
OLAPは列に対する操作(Count, group by)が多い。
Arrowは列志向




Evernote はあなたがすべてを記憶し、手間をかけずに整理できるようにお手伝いします。Evernote をダウンロードする

2019年4月11日木曜日

Druid起動時エラー

  • このエントリーをはてなブックマークに追加


AmbariでDruidをインストールして起動すると以下のエラーが出力されます。

resource_management.core.exceptions.ExecutionFailed: Execution of '/usr/java/default/bin/java -cp /usr/lib/ambari-agent/DBConnectionVerification.jar:/usr/hdp/current/druid-historical/extensions/mysql-metadata-storage/mysql-connector-java-8.0.15-1.el7.noarch.rpm org.apache.ambari.server.DBConnectionVerification 'jdbc:mysql://localhost:3306/druid?createDatabaseIfNotExist=true' druid [PROTECTED] com.mysql.jdbc.Driver' returned 1. ERROR: Unable to connect to the DB. Please check DB connection properties.
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

mysql-connect jarが見つからないためのエラーです。

以下のようにambari-serverコマンド実行する

ambari-server setup --jdbc-db=mysql --jdbc-driver=<location where you downloaded mysql connector>



Evernote はあなたがすべてを記憶し、手間をかけずに整理できるようにお手伝いします。Evernote をダウンロードする

2019年4月10日水曜日

BeelineでHiveに接続する際のエラー

  • このエントリーをはてなブックマークに追加


KerberosされたHadoop環境でbeelineでHiveに接続するときのエラーと対処法をご紹介します。
beeline -u "jdbc:hive2://hostname:10000/default" で接続する
Error 1
ERROR org.apache.thrift.server.TThreadPoolServer: [HiveServer2-Handler-Pool: Thread-40]: Error occurred during processing of message.java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: Unsupported mechanism type PLAIN

対処法
Kerberosされた環境で接続URLにprincipalを指定する必要がある。


beeline -u "jdbc:hive2://hostname:10000/default;principal=user1@HADOOP" で接続する

Error 2
Error: Could not open client transport with JDBC Uri: jdbc:hive2://localhost:10000/default;principal=user1@HADOOP: Kerberos principal should have 3 parts: user1@HADOOP (state=08S01,code=0)

対処法
principalにはuser/hostname@REALM User、Hostname、Realm3つのPartが必要

Error 3
Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
Caused by: sun.security.krb5.KrbException: Server not found in Kerberos database (7) - LOOKING_UP_SERVER
Caused by: sun.security.krb5.Asn1Exception: Identifier doesn't match expected value (906)

対処法
principalには hive/hiveserver2_hostname@REALM のように、user HiveとHiveserver2のホスト名を指定することが必要
beeline -u "jdbc:hive2://192.168.10.154:10000/default;principal=hive/ip-192-168-10-154.ap-northeast-1.compute.internal@HADOOP"

参考
Evernote はあなたがすべてを記憶し、手間をかけずに整理できるようにお手伝いします。Evernote をダウンロードする