Integrate Presto with Cloud Data Streaming Service

Prev Next

Available in VPC

This guide introduces how to integrate NAVER Cloud Platform's Cloud Hadoop with Cloud Data Streaming Service (CDSS).
This guide is based on the Kafka Connector Tutorial guide provided by the Presto official guide.

Preparations

  1. Create a Cloud Hadoop cluster.
  2. Create Cloud Data Streaming Service.
  3. Create and set up a VM to use Cloud Data Streaming Service.
  4. Set up an ACG.
    • In Cloud Hadoop, you must allow port 9092 to access the Cloud Data Streaming Service Broker node.
    • Add the Cloud Hadoop Subnet bandwidth to the Broker node ACG access source of Cloud Data Streaming Service.
      cloudhadoop-use-pre-vpc_ko
Note

It is recommended that Cloud Hadoop and Cloud Data Streaming Service be created on the same Subnet to communicate within the same VPC.

Upload the data to CDSS (Kafka)

Run Kafka in Cloud Data Streaming Service VM.

[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties

Download the data to Kafka.

[root@s17e27e0cf6c kafka_2.12-2.4.0]# curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 21.6M  100 21.6M    0     0  7948k      0  0:00:02  0:00:02 --:--:-- 7947k

Upload the data to Kafka.

[root@s17e27e0cf6c kafka_2.12-2.4.0]# chmod 755 kafka-tpch
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./kafka-tpch load --brokers 172.16.2.6:9092 --prefix tpch. --tpch-type tiny
2022-02-07T10:30:09.426+0900     INFO   main    io.airlift.log.Logging  Logging to stderr
2022-02-07T10:30:09.448+0900     INFO   main    de.softwareforge.kafka.LoadCommand      Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
2022-02-07T10:30:09.859+0900     INFO   pool-1-thread-1 de.softwareforge.kafka.LoadCommand      Loading table 'customer' into topic 'tpch.customer'...
2022-02-07T10:30:09.859+0900     INFO   pool-1-thread-2 de.softwareforge.kafka.LoadCommand      Loading table 'orders' into topic 'tpch.orders'...
2022-02-07T10:30:09.859+0900     INFO   pool-1-thread-3 de.softwareforge.kafka.LoadCommand      Loading table 'lineitem' into topic 'tpch.lineitem'...
2022-02-07T10:30:09.860+0900     INFO   pool-1-thread-4 de.softwareforge.kafka.LoadCommand      Loading table 'part' into topic 'tpch.part'...
2022-02-07T10:30:09.860+0900     INFO   pool-1-thread-5 de.softwareforge.kafka.LoadCommand      Loading table 'partsupp' into topic 'tpch.partsupp'...
2022-02-07T10:30:09.860+0900     INFO   pool-1-thread-6 de.softwareforge.kafka.LoadCommand      Loading table 'supplier' into topic 'tpch.supplier'...
2022-02-07T10:30:09.860+0900     INFO   pool-1-thread-7 de.softwareforge.kafka.LoadCommand      Loading table 'nation' into topic 'tpch.nation'...
2022-02-07T10:30:09.865+0900     INFO   pool-1-thread-8 de.softwareforge.kafka.LoadCommand      Loading table 'region' into topic 'tpch.region'...
2022-02-07T10:30:13.079+0900     INFO   pool-1-thread-7 de.softwareforge.kafka.LoadCommand      Generated 25 rows for table 'nation'.
2022-02-07T10:30:13.175+0900     INFO   pool-1-thread-6 de.softwareforge.kafka.LoadCommand      Generated 100 rows for table 'supplier'.
2022-02-07T10:30:13.514+0900     INFO   pool-1-thread-8 de.softwareforge.kafka.LoadCommand      Generated 5 rows for table 'region'.
2022-02-07T10:30:13.711+0900     INFO   pool-1-thread-1 de.softwareforge.kafka.LoadCommand      Generated 1500 rows for table 'customer'.
2022-02-07T10:30:14.168+0900     INFO   pool-1-thread-4 de.softwareforge.kafka.LoadCommand      Generated 2000 rows for table 'part'.
2022-02-07T10:30:14.895+0900     INFO   pool-1-thread-5 de.softwareforge.kafka.LoadCommand      Generated 8000 rows for table 'partsupp'.
2022-02-07T10:30:15.078+0900     INFO   pool-1-thread-2 de.softwareforge.kafka.LoadCommand      Generated 15000 rows for table 'orders'.
2022-02-07T10:30:16.335+0900     INFO   pool-1-thread-3 de.softwareforge.kafka.LoadCommand      Generated 60175 rows for table 'lineitem'.

Add the connector to Presto

From Ambari UI, add the connectors.to.add value in Presto > [CONFIGS] > Advanced connectors.properties as follows and click the [SAVE] button.

{"kafka":["connector.name=kafka",         
"kafka.nodes=172.16.2.6:9092",         
"kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region",         
"kafka.hide-internal-columns=false"]         }

hadoop-vpc-use-ex15_connect1_vpc_ko

A reboot is required for the changed configuration to take effect. Click [ACTIONS] > Restart All in the upper right corner and click [CONFIRM RESTART ALL] in the popup window.

View tables in Presto

Access Cloud Hadoop edge node and run Presto.

  • Set the catalog to kafka and the schema to tpch.
[sshuser@e-001-example-pzt-hd ~]$ /home1/cdp/usr/nch/3.1.0.0-78/trino/bin/trino-cli --server http://pub-210ab.hadoop.ntruss.com:8285 --catalog kafka --schema tpch
presto:tpch> SHOW TABLES;
  Table
----------
 customer
 lineitem
 nation
 orders
 part
 partsupp
 region
 supplier
(8 rows)

Query 20220128_064417_00003_96n53, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
0:00 [8 rows, 166B] [57 rows/s, 1.16 KB/s]

Check the content using a simple query.

presto:tpch> DESCRIBE customer;
      Column       |  Type   | Extra |                   Comment
-------------------+---------+-------+------------------------------------------
 _partition_id     | bigint  |       | Partition Id
 _partition_offset | bigint  |       | Offset for the message within the partiti
 _message_corrupt  | boolean |       | Message data is corrupt
 _message          | varchar |       | Message text
 _message_length   | bigint  |       | Total number of message bytes
 _key_corrupt      | boolean |       | Key data is corrupt
 _key              | varchar |       | Key text
 _key_length       | bigint  |       | Total number of key bytes
 _timestamp        | bigint  |       | Offset Timestamp
(9 rows)

presto:tpch> SELECT _message FROM customer LIMIT 5;

--------------------------------------------------------------------------------
 {"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeR
 {"rowNumber":4,"customerKey":4,"name":"Customer#000000004","address":"XxVSJsLAG
 {"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZN
 {"rowNumber":10,"customerKey":10,"name":"Customer#000000010","address":"6LrEaV6
 {"rowNumber":13,"customerKey":13,"name":"Customer#000000013","address":"nsXQu0o
(5 rows)

Note

For more information on using Presto and Kafka, see Kafka Connector Tutorial.