- 印刷する
- PDF
Logstashを活用した Cloud Database(MySQL)連携
- 印刷する
- PDF
VPC環境で利用できます。
LogStashを使用して Search Engine Serviceと Cloud DB for MySQL間 Data Pipeline構築をする方法を説明します。
事前作業
本ガイドを実行する前にご利用の申し込みを完了する作業は、次の通りです。
- VPCおよびサーバ作成
- Cloud DB for MySQLサーバを作成する。
- Search Engine Serviceクラスタを作成する。
ユースケースは Serverに Logstashを起動した後、Cloud DB for MySQLのデータを一定周期ごとに Search Engine Serviceに転送する方法を扱っています。
ネットワークを設定する
ネットワーク設定のユースケースを紹介します。
STEP 1. Cloud DB for MySQL User設定
DB管理 > DB User管理で server帯域から DB Userの接続ができるよう設定します。
STEP 2. ACG設定
Cloud Data Streaming Serviceブローカーノードの9092番ポートでアクセスできるように ACGを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールで、Services > Compute > Server > ACG メニューを順にクリックします。
- ACGリストで 'cdss-mysql-xxxxx'を選択した後、[ACG設定] ボタンをクリックします。
- ACGルールを入力し、[追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Logstashが実行したサーバの IPアドレス
- 許可ポート: DBで設定したポート(Default: 3306)
- [適用] ボタンをクリックします。
Search Engine Serviceマネージャノードの9200番ポートでアクセスできるように ACGを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールで、Services > Compute > Server > ACG メニューを順にクリックします。
- ACGリストで「searchengine-m-xxxxx」を選択し、[ACG設定] ボタンをクリックします。
- ACGルールを入力し、[追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Logstashが実行したサーバの IPアドレス
- 許可ポート: 9200
Logstashインストール
Serverに Logstashをインストールするユースケースを紹介します。インストール過程には、ElasticSearchと OpenSearchの過程が含まれています。使用するバージョンに合わせてインストールすることで正常にテストできます。
STEP 1. Javaインストール
Javaをインストールする方法は次の通りです。
- 以下のコマンドを入力して javaをインストールします。
yum install java-devel -y
STEP 2. Logstashインストール
Logstashをインストールする方法は次の通りです。
OSS ライセンスでインストールすることで正常に作動します。
- 以下のコマンドを入力して
/root
経路に Logstashをダウンロードします。
# Elasticsearchバージョンの場合(OSSバージョンインストール)
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.7.0.rpm
# OpenSearchバージョンの場合
wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
- 以下のコマンドを入力してダウンロードしたファイルをインストールします。
# Elasticsearchバージョンの場合
rpm -ivh logstash-oss-7.7.0.rpm
# OpenSearchバージョンの場合
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
- JDBC Driverをインストールします。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.30.zip
unzip mysql-connector-java-8.0.30.zip
mkdir /etc/logstash/plugin
mv /root/mysql-connector-java-8.0.30/mysql-connector-java-8.0.30.jar /etc/logstash/plugin/
以下のコマンドを入力して Logstash起動前に
logstash.conf
ファイルを修正します。- Elasticsearchバージョンの場合
mv /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash.conf vi /etc/logstash/conf.d/logstash.conf
- ElasticSearchバージョン logstash.conf
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/etc/logstash/plugin/mysql-connector-java-8.0.30.jar" jdbc_connection_string => "jdbc:mysql://${cdb mysql endpoint}:3306/${cdb mysql database name}?useSSL=false" jdbc_user => "${cdb user name}" jdbc_password => "${cdb mysql password}" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => true jdbc_page_size => 50 statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM logstash_test WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC" record_last_run => true clean_run => true tracking_column_type => "numeric" tracking_column => "unix_ts_in_secs" use_column_value => true last_run_metadata_path => "/etc/logstash/data/student" schedule => "*/5 * * * * *" } } output { elasticsearch { hosts => ["http://${ses manager node1 ip}:9200", "http://${ses manager node2 ip}:9200"] index => "cdss-%{+YYYY.MM.dd}" } }
- OpenSearchバージョンの場合
# /root/にインストールする場合{インストールパス}は/root/logstash-7.16.3です。 mv {インストールパス}/config/logstash-sample.conf {インストールパス}/config/logstash.conf vi {インストールパス}/config/logstash.conf
- OpenSearch バージョン logstash.conf
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/etc/logstash/plugin/mysql-connector-java-8.0.30.jar" jdbc_connection_string => "jdbc:mysql://${cdb mysql endpoint}:3306/${cdb mysql database name}?useSSL=false" jdbc_user => "${cdb user name}" jdbc_password => "${cdb mysql password}" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => true jdbc_page_size => 50 statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM logstash_test WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC" record_last_run => true clean_run => true tracking_column_type => "numeric" tracking_column => "unix_ts_in_secs" use_column_value => true last_run_metadata_path => "/etc/logstash/data/student" schedule => "*/5 * * * * *" } } output { opensearch { hosts => ["https://${ses manager node1 ip}:9200", "https://${ses manager node2 ip}:9200"] index => "cdss-%{+YYYY.MM.dd}" user => ${userID} password => ${password} ssl_certificate_verification => false } }
- Logstash Conf Comment
上記の Sample Logstash Confファイルは5秒毎に[statement]構文が実行され "statement" セグメントは最後に照会した rowの update_timeより最近アップデートされた rowを選択するクエリです。 ${cdb mysql endpoint} - CDB MySQLの Privateドメインに変更します。 ${cdb mysql database name} - CDB MySQL内に自分が作成した Database名を入力します。 ${cdb user name} - CDB MySQL内に自分が作成したアカウント名を入力します。 ${cdb mysql password} - CDB MySQL内に自分が作成したアカウントのパスワードを入力します。 ${ses manager node1 ip} - Search Engine Serviceマネージャーノードの IPアドレスを入力します。 ${ses manager node2 ip} - Search Engine Serviceマネージャーノードの IPアドレスを入力します(マネージャーノードが二重化されていない場合は入力しません。)。 ${userID} - OpenSearchの場合、クラスタ作成時に入力した IDです。 ${password} - OpenSearchの場合、クラスタ作成時に入力した passwordです。
- logstash metadata 経路の作成をする。
mkdir /etc/logstash/data chown -R logstash:logstash /etc/logstash/data
- Elasticsearchバージョンの場合
Cloud DB for MySQL設定
Cloud DB for MySQL設定ユースケースは次の通りです。
- Tableを作成します。
create table logstash_test( id BIGINT(20) UNSIGNED NOT NULL, PRIMARY KEY (id), contents nvarchar(255), create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
- Dataを入力をします。
insert into logstash_test(id, contents) values(1, "this is first contents"); insert into logstash_test(id, contents) values(2, "this is second contents");
Logstash実行
Logstash実行ユースケースは次の通りです。
# Elasticsearchバージョンの場合
systemctl start logstash
# OpenSearchバージョンの場合
# バックグラウンドを行うため、 nohupを使用します。
# -fオプションで logstash.confのパスを指定してください。
nohup {インストールパス}/bin/logstash -f ~{インストールパス}/config/logstash.conf &
Cloud DB for MySQLデータ照会
Search Engine Serviceで Cloud DB for MySQLのデータを照会するユースケースは次の通りです。
GET mysql-2022.08.08/_search
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "mysql-2022.08.08",
"_type" : "_doc",
"_id" : "VNkwe4IBicE7MyrTCKIL",
"_score" : 1.0,
"_source" : {
"contents" : "this is first contents",
"update_time" : "2022-08-08T01:55:47.000Z",
"unix_ts_in_secs" : 1659923747,
"id" : 1,
"create_time" : "2022-08-08T01:55:47.000Z",
"@version" : "1",
"@timestamp" : "2022-08-08T02:02:01.082Z"
}
},
{
"_index" : "mysql-2022.08.08",
"_type" : "_doc",
"_id" : "3yEwe4IBBeKbW6yXB2l_",
"_score" : 1.0,
"_source" : {
"contents" : "this is second contents",
"update_time" : "2022-08-08T01:59:05.000Z",
"unix_ts_in_secs" : 1659923945,
"id" : 2,
"create_time" : "2022-08-08T01:59:05.000Z",
"@version" : "1",
"@timestamp" : "2022-08-08T02:02:01.093Z"
}
}
]
}
}