VPC環境で利用できます。
NAVERクラウドプラットフォームの Cloud Hadoopと Cloud Data Streaming Service(CDSS)を連携する方法をご紹介します。
このガイドは、Presto公式ガイドで提供する Kafka Connector Tutorialガイドをベースに作成されました。
事前タスク
- Cloud Hadoopクラスタを作成します。
- Cloud Hadoopクラスタ作成に関する詳細は、Cloud Hadoop を開始するガイドをご参照ください。
- Cloud Data Streaming Serviceを作成します。
- Cloud Data Streaming Service作成に関する詳細は、Cloud Data Streaming Service ご利用ガイドをご参照ください。
- Cloud Data Streaming Serviceを使用するには、VM作成とセッティングを行う必要があります。
- VM作成とセッティングに関する詳細は、Cloud Data Streaming Service ご利用ガイドをご参照ください。
- ACGを設定します。
- Cloud Hadoopで Cloud Data Streaming Service Brokerノードに接続するためには、9092ポートを許可する必要があります。
- Cloud Data Streaming Serviceの Brokerノード ACG アクセスソースに Cloud Hadoopの Subnet帯域を追加します。

参考
Cloud Hadoopと Cloud Data Streaming Serviceは、同じ VPC内で通信が可能な同じ Subnetで作成することをお勧めします。
CDSS(Kafka)へのデータアップロード
Cloud Data Streaming Service VMで Kafkaを実行します。
[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
Kafkaにデータをダウンロードします。
[root@s17e27e0cf6c kafka_2.12-2.4.0]# curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 21.6M 100 21.6M 0 0 7948k 0 0:00:02 0:00:02 --:--:-- 7947k
Kafkaにデータをアップロードします。
[root@s17e27e0cf6c kafka_2.12-2.4.0]# chmod 755 kafka-tpch
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./kafka-tpch load --brokers 172.16.2.6:9092 --prefix tpch. --tpch-type tiny
2022-02-07T10:30:09.426+0900 INFO main io.airlift.log.Logging Logging to stderr
2022-02-07T10:30:09.448+0900 INFO main de.softwareforge.kafka.LoadCommand Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
2022-02-07T10:30:09.859+0900 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2022-02-07T10:30:09.859+0900 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2022-02-07T10:30:09.859+0900 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2022-02-07T10:30:09.860+0900 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
2022-02-07T10:30:09.860+0900 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Loading table 'partsupp' into topic 'tpch.partsupp'...
2022-02-07T10:30:09.860+0900 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Loading table 'supplier' into topic 'tpch.supplier'...
2022-02-07T10:30:09.860+0900 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Loading table 'nation' into topic 'tpch.nation'...
2022-02-07T10:30:09.865+0900 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Loading table 'region' into topic 'tpch.region'...
2022-02-07T10:30:13.079+0900 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Generated 25 rows for table 'nation'.
2022-02-07T10:30:13.175+0900 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Generated 100 rows for table 'supplier'.
2022-02-07T10:30:13.514+0900 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Generated 5 rows for table 'region'.
2022-02-07T10:30:13.711+0900 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Generated 1500 rows for table 'customer'.
2022-02-07T10:30:14.168+0900 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Generated 2000 rows for table 'part'.
2022-02-07T10:30:14.895+0900 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Generated 8000 rows for table 'partsupp'.
2022-02-07T10:30:15.078+0900 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Generated 15000 rows for table 'orders'.
2022-02-07T10:30:16.335+0900 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Generated 60175 rows for table 'lineitem'.
Prestoへの connector追加
Ambari UIで Presto > [CONFIGS] > Advanced connectors.properties に connectors.to.addの値を以下のように追加した後、 [SAVE] ボタンをクリックします。
{"kafka":["connector.name=kafka",
"kafka.nodes=172.16.2.6:9092",
"kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region",
"kafka.hide-internal-columns=false"] }

変更された構成を適用するために再起動が必要です。右上の [ACTIONS] > Restart All をクリックし、ポップアップの [CONFIRM RESTART ALL] ボタンをクリックします。
Prestoでのテーブル照会
Cloud Hadoopエッジノードに接続して Prestoを実行します。
- catalogは kafka、schemaは tpchに設定します。
[sshuser@e-001-example-pzt-hd ~]$ /home1/cdp/usr/nch/3.1.0.0-78/trino/bin/trino-cli --server http://pub-210ab.hadoop.ntruss.com:8285 --catalog kafka --schema tpch
presto:tpch> SHOW TABLES;
Table
----------
customer
lineitem
nation
orders
part
partsupp
region
supplier
(8 rows)
Query 20220128_064417_00003_96n53, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:00 [8 rows, 166B] [57 rows/s, 1.16KB/s]
簡単なクエリで内容を確認します。
presto:tpch> DESCRIBE customer;
Column | Type | Extra | Comment
-------------------+---------+-------+------------------------------------------
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partiti
_message_corrupt | boolean | | Message data is corrupt
_message | varchar | | Message text
_message_length | bigint | | Total number of message bytes
_key_corrupt | boolean | | Key data is corrupt
_key | varchar | | Key text
_key_length | bigint | | Total number of key bytes
_timestamp | bigint | | Offset Timestamp
(9 rows)
presto:tpch> SELECT _message FROM customer LIMIT 5;
--------------------------------------------------------------------------------
{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeR
{"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAG
{"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZN
{"rowNumber":10,"customerKey":10,"name":"Customer#000000010","address":"6LrEaV6
{"rowNumber":13,"customerKey":13,"name":"Customer#000000013","address":"nsXQu0o
(5 rows)
参考
Prestoと Kafka活用の詳細は、Kafka Connector Tutorialをご参照ください。