Cloud Data Streaming Serviceで Presto連携

Prev Next

VPC環境で利用できます。

NAVERクラウドプラットフォームの Cloud Hadoopと Cloud Data Streaming Service(CDSS)を連携する方法をご紹介します。
このガイドは、Presto公式ガイドで提供する Kafka Connector Tutorialガイドをベースに作成されました。

事前タスク

  1. Cloud Hadoopクラスタを作成します。
  2. Cloud Data Streaming Serviceを作成します。
  3. Cloud Data Streaming Serviceを使用するには、VM作成とセッティングを行う必要があります。
  4. ACGを設定します。
    • Cloud Hadoopで Cloud Data Streaming Service Brokerノードに接続するためには、9092ポートを許可する必要があります。
    • Cloud Data Streaming Serviceの Brokerノード ACG アクセスソースに Cloud Hadoopの Subnet帯域を追加します。
      cloudhadoop-use-pre-vpc_ko
参考

Cloud Hadoopと Cloud Data Streaming Serviceは、同じ VPC内で通信が可能な同じ Subnetで作成することをお勧めします。

CDSS(Kafka)へのデータアップロード

Cloud Data Streaming Service VMで Kafkaを実行します。

[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties

Kafkaにデータをダウンロードします。

[root@s17e27e0cf6c kafka_2.12-2.4.0]# curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 21.6M  100 21.6M    0     0  7948k      0  0:00:02  0:00:02 --:--:-- 7947k

Kafkaにデータをアップロードします。

[root@s17e27e0cf6c kafka_2.12-2.4.0]# chmod 755 kafka-tpch
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./kafka-tpch load --brokers 172.16.2.6:9092 --prefix tpch. --tpch-type tiny
2022-02-07T10:30:09.426+0900     INFO   main    io.airlift.log.Logging  Logging to stderr
2022-02-07T10:30:09.448+0900     INFO   main    de.softwareforge.kafka.LoadCommand      Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
2022-02-07T10:30:09.859+0900     INFO   pool-1-thread-1 de.softwareforge.kafka.LoadCommand      Loading table 'customer' into topic 'tpch.customer'...
2022-02-07T10:30:09.859+0900     INFO   pool-1-thread-2 de.softwareforge.kafka.LoadCommand      Loading table 'orders' into topic 'tpch.orders'...
2022-02-07T10:30:09.859+0900     INFO   pool-1-thread-3 de.softwareforge.kafka.LoadCommand      Loading table 'lineitem' into topic 'tpch.lineitem'...
2022-02-07T10:30:09.860+0900     INFO   pool-1-thread-4 de.softwareforge.kafka.LoadCommand      Loading table 'part' into topic 'tpch.part'...
2022-02-07T10:30:09.860+0900     INFO   pool-1-thread-5 de.softwareforge.kafka.LoadCommand      Loading table 'partsupp' into topic 'tpch.partsupp'...
2022-02-07T10:30:09.860+0900     INFO   pool-1-thread-6 de.softwareforge.kafka.LoadCommand      Loading table 'supplier' into topic 'tpch.supplier'...
2022-02-07T10:30:09.860+0900     INFO   pool-1-thread-7 de.softwareforge.kafka.LoadCommand      Loading table 'nation' into topic 'tpch.nation'...
2022-02-07T10:30:09.865+0900     INFO   pool-1-thread-8 de.softwareforge.kafka.LoadCommand      Loading table 'region' into topic 'tpch.region'...
2022-02-07T10:30:13.079+0900     INFO   pool-1-thread-7 de.softwareforge.kafka.LoadCommand      Generated 25 rows for table 'nation'.
2022-02-07T10:30:13.175+0900     INFO   pool-1-thread-6 de.softwareforge.kafka.LoadCommand      Generated 100 rows for table 'supplier'.
2022-02-07T10:30:13.514+0900     INFO   pool-1-thread-8 de.softwareforge.kafka.LoadCommand      Generated 5 rows for table 'region'.
2022-02-07T10:30:13.711+0900     INFO   pool-1-thread-1 de.softwareforge.kafka.LoadCommand      Generated 1500 rows for table 'customer'.
2022-02-07T10:30:14.168+0900     INFO   pool-1-thread-4 de.softwareforge.kafka.LoadCommand      Generated 2000 rows for table 'part'.
2022-02-07T10:30:14.895+0900     INFO   pool-1-thread-5 de.softwareforge.kafka.LoadCommand      Generated 8000 rows for table 'partsupp'.
2022-02-07T10:30:15.078+0900     INFO   pool-1-thread-2 de.softwareforge.kafka.LoadCommand      Generated 15000 rows for table 'orders'.
2022-02-07T10:30:16.335+0900     INFO   pool-1-thread-3 de.softwareforge.kafka.LoadCommand      Generated 60175 rows for table 'lineitem'.

Prestoへの connector追加

Ambari UIで Presto > [CONFIGS] > Advanced connectors.properties に connectors.to.addの値を以下のように追加した後、 [SAVE] ボタンをクリックします。

{"kafka":["connector.name=kafka",         
"kafka.nodes=172.16.2.6:9092",         
"kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region",         
"kafka.hide-internal-columns=false"]         }

hadoop-vpc-use-ex15_connect1_vpc_ko

変更された構成を適用するために再起動が必要です。右上の [ACTIONS] > Restart All をクリックし、ポップアップの [CONFIRM RESTART ALL] ボタンをクリックします。

Prestoでのテーブル照会

Cloud Hadoopエッジノードに接続して Prestoを実行します。

  • catalogは kafka、schemaは tpchに設定します。
[sshuser@e-001-example-pzt-hd ~]$ /home1/cdp/usr/nch/3.1.0.0-78/trino/bin/trino-cli --server http://pub-210ab.hadoop.ntruss.com:8285 --catalog kafka --schema tpch
presto:tpch> SHOW TABLES;
  Table
----------
 customer
 lineitem
 nation
 orders
 part
 partsupp
 region
 supplier
(8 rows)

Query 20220128_064417_00003_96n53, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:00 [8 rows, 166B] [57 rows/s, 1.16KB/s]

簡単なクエリで内容を確認します。

presto:tpch> DESCRIBE customer;
      Column       |  Type   | Extra |                   Comment
-------------------+---------+-------+------------------------------------------
 _partition_id     | bigint  |       | Partition Id
 _partition_offset | bigint  |       | Offset for the message within the partiti
 _message_corrupt  | boolean |       | Message data is corrupt
 _message          | varchar |       | Message text
 _message_length   | bigint  |       | Total number of message bytes
 _key_corrupt      | boolean |       | Key data is corrupt
 _key              | varchar |       | Key text
 _key_length       | bigint  |       | Total number of key bytes
 _timestamp        | bigint  |       | Offset Timestamp
(9 rows)

presto:tpch> SELECT _message FROM customer LIMIT 5;

--------------------------------------------------------------------------------
 {"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeR
 {"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAG
 {"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZN
 {"rowNumber":10,"customerKey":10,"name":"Customer#000000010","address":"6LrEaV6
 {"rowNumber":13,"customerKey":13,"name":"Customer#000000013","address":"nsXQu0o
(5 rows)

参考

Prestoと Kafka活用の詳細は、Kafka Connector Tutorialをご参照ください。