VPC環境で利用できます。
LogStashを使用して Search Engine Serviceと Cloud DB for MySQL間の Data Pipelineを構築する方法について説明します。
事前タスク
本ガイドを実行する前にご利用の申し込みを完了する必要があるタスクは、次の通りです。
- VPCとサーバの作成
- Cloud DB for MySQLサーバ作成
- Search Engine Serviceクラスタ作成
ユースケースとして、Serverに Logstashを起動した後、Cloud DB for MySQLのデータを一定周期ごとに Search Engine Serviceに送信する方法を扱っています。
ネットワークを設定する
ネットワーク設定のユースケースを紹介します。
STEP 1.Cloud DB for MySQL User設定
DB管理 > DBユーザー管理で、server帯域からの DBユーザーのアクセスが可能になるように設定する必要があります。

STEP 2.ACG設定
Cloud Data Streaming Serviceブローカーノードの9092番ポートにアクセスできるように ACGを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Compute > Server > ACGメニューを順にクリックします。 - ACGリストから「cloud-mysql-xxxx」を選択し、 [ACG設定] ボタンをクリックします。
- ACGルールを入力し、 [追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Logstashが実行されるサーバの IPアドレス
- 許可ポート: DBで設定したポート(Default: 3306)
- [適用] ボタンをクリックします。
Search Engine Serviceマネージャノードの9200番ポートにアクセスできるように ACGを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Compute > Server > ACGメニューを順にクリックします。 - ACGリストから「searchengine-m-xxxxx」を選択し、 [ACG設定] ボタンをクリックします。
- ACGルールを入力し、 [追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Logstashが実行されるサーバの IPアドレス
- 許可ポート: 9200
Logstashのインストール
Serverに Logstashをインストールするユースケースを紹介します。インストールプロセスには ElasticSearchと OpenSearchのプロセスが共に含まれています。使用するバージョンに合わせてインストールする必要があります。
STEP 1.Javaのインストール
Javaをインストールする方法は次の通りです。
- 以下のコマンドを入力して javaをインストールします。
yum install java-devel -y
STEP 2.Logstashのインストール
Logstashをインストールする方法は次の通りです。
正常に動作するには、必ず OSSライセンスでインストールする必要があります。
- 以下のコマンドを入力して
/rootパスに Logstashをダウンロードします。
# Elasticsearchバージョンの場合(OSSバージョンをインストール)
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.7.0.rpm
# OpenSearchバージョンの場合
wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
- 以下のコマンドを入力してダウンロードしたファイルをインストールします。
# Elasticsearchバージョンの場合
rpm -ivh logstash-oss-7.7.0.rpm
# OpenSearchバージョンの場合
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
- JDBC Driverをインストールします。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.30.zip
unzip mysql-connector-java-8.0.30.zip
mkdir /etc/logstash/plugin
mv /root/mysql-connector-java-8.0.30/mysql-connector-java-8.0.30.jar /etc/logstash/plugin/
- 以下のコマンドを入力して、Logstash起動前に
logstash.confファイルを変更します。- Elasticsearchバージョンの場合
mv /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash.conf vi /etc/logstash/conf.d/logstash.conf - ElasticSearchバージョン logstash.conf
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/etc/logstash/plugin/mysql-connector-java-8.0.30.jar" jdbc_connection_string => "jdbc:mysql://${cdb mysql endpoint}:3306/${cdb mysql database name}?useSSL=false" jdbc_user => "${cdb user name}" jdbc_password => "${cdb mysql password}" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => true jdbc_page_size => 50 statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM logstash_test WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC" record_last_run => true clean_run => true tracking_column_type => "numeric" tracking_column => "unix_ts_in_secs" use_column_value => true last_run_metadata_path => "/etc/logstash/data/student" schedule => "*/5 * * * * *" } } output { elasticsearch { hosts => ["http://${ses manager node1 ip}:9200", "http://${ses manager node2 ip}:9200"] index => "cdss-%{+YYYY.MM.dd}" } }- OpenSearchバージョンの場合
# /root/にインストールする場合、{インストールパス}は/root/logstash-7.16.3です。 mv {インストールパス}/config/logstash-sample.conf {インストールパス}/config/logstash.conf vi {インストールパス}/config/logstash.conf - OpenSearchバージョン logstash.conf
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/etc/logstash/plugin/mysql-connector-java-8.0.30.jar" jdbc_connection_string => "jdbc:mysql://${cdb mysql endpoint}:3306/${cdb mysql database name}?useSSL=false" jdbc_user => "${cdb user name}" jdbc_password => "${cdb mysql password}" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => true jdbc_page_size => 50 statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM logstash_test WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC" record_last_run => true clean_run => true tracking_column_type => "numeric" tracking_column => "unix_ts_in_secs" use_column_value => true last_run_metadata_path => "/etc/logstash/data/student" schedule => "*/5 * * * * *" } } output { opensearch { hosts => ["https://${ses manager node1 ip}:9200", "https://${ses manager node2 ip}:9200"] index => "cdss-%{+YYYY.MM.dd}" user => ${userID} password => ${password} ssl_certificate_verification => false } } - Logstash Conf Comment
上記の Sample Logstash Confファイルは5秒ごとに「statement」構文が実行されます。 「statement」構文は、最後に照会した行の update_timeよりも最近アップデートされた rowを選択するクエリです。 ${cdb mysql endpoint} - CDB MySQLの Privateドメインに変更します。 ${cdb mysql database name} - CDB MySQL内で自分が作成した Database名を入力します。 ${cdb user name} - CDB MySQL内で自分が作成したアカウント名を入力します。 ${cdb mysql password} - CDB MySQL内で自分が作成したパスワードを入力します。 ${ses manager node1 ip} - Search Engine Serviceマネージャノードの IPアドレスを入力します。 ${ses manager node2 ip} - Search Engine Serviceマネージャノードの IPアドレスを入力します(マネージャノードが冗長化されていない場合は入力しません) ${userID} - OpenSearchの場合、クラスタ作成時に入力した IDです。 ${password} - OpenSearchの場合、クラスタ作成時に入力した passwordです。 - logstash metadataパスを作成する
mkdir /etc/logstash/data chown -R logstash:logstash /etc/logstash/data
- Elasticsearchバージョンの場合
Cloud DB for MySQL設定
Cloud DB for MySQL設定のユースケースは、次の通りです。
- Table作成
create table logstash_test( id BIGINT(20) UNSIGNED NOT NULL, PRIMARY KEY (id), contents nvarchar(255), create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); - Data入力
insert into logstash_test(id, contents) values(1, "this is first contents"); insert into logstash_test(id, contents) values(2, "this is second contents");
Logstash実行
Logstash実行のユースケースは、次の通りです。
# Elasticsearchバージョンの場合
systemctl start logstash
# OpenSearchバージョンの場合
# バックグラウンド実行のため nohupを使用します。
# -fオプションを使用して logstash.confのパスを指定する必要があります。
nohup {インストールパス}/bin/logstash -f ~{インストールパス}/config/logstash.conf &
Cloud DB for MySQLデータ照会
Search Engine Serviceで Cloud DB for MySQLのデータを照会するユースケースは、次の通りです。
GET mysql-2022.08.08/_search
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "mysql-2022.08.08",
"_type" : "_doc",
"_id" : "VNkwe4IBicE7MyrTCKIL",
"_score" : 1.0,
"_source" : {
"contents" : "this is first contents",
"update_time" : "2022-08-08T01:55:47.000Z",
"unix_ts_in_secs" : 1659923747,
"id" : 1,
"create_time" : "2022-08-08T01:55:47.000Z",
"@version" : "1",
"@timestamp" : "2022-08-08T02:02:01.082Z"
}
},
{
"_index" : "mysql-2022.08.08",
"_type" : "_doc",
"_id" : "3yEwe4IBBeKbW6yXB2l_",
"_score" : 1.0,
"_source" : {
"contents" : "this is second contents",
"update_time" : "2022-08-08T01:59:05.000Z",
"unix_ts_in_secs" : 1659923945,
"id" : 2,
"create_time" : "2022-08-08T01:59:05.000Z",
"@version" : "1",
"@timestamp" : "2022-08-08T02:02:01.093Z"
}
}
]
}
}