Logstash を活用した Cloud Database(MySQL) 連携

Prev Next

VPC環境で利用できます。

LogStashを使用して Search Engine Serviceと Cloud DB for MySQL間の Data Pipelineを構築する方法について説明します。

事前タスク

本ガイドを実行する前にご利用の申し込みを完了する必要があるタスクは、次の通りです。

  • VPCとサーバの作成
  • Cloud DB for MySQLサーバ作成
  • Search Engine Serviceクラスタ作成

ユースケースとして、Serverに Logstashを起動した後、Cloud DB for MySQLのデータを一定周期ごとに Search Engine Serviceに送信する方法を扱っています。

ネットワークを設定する

ネットワーク設定のユースケースを紹介します。

STEP 1.Cloud DB for MySQL User設定

DB管理 > DBユーザー管理で、server帯域からの DBユーザーのアクセスが可能になるように設定する必要があります。
ses-mysql-user-screen_ko

STEP 2.ACG設定

Cloud Data Streaming Serviceブローカーノードの9092番ポートにアクセスできるように ACGを設定する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Compute > Server > ACGメニューを順にクリックします。
  2. ACGリストから「cloud-mysql-xxxx」を選択し、 [ACG設定] ボタンをクリックします。
  3. ACGルールを入力し、 [追加] ボタンをクリックします。
    ses-5-4-1-1_ko
    • プロトコル: TCP
    • アクセスソース: Logstashが実行されるサーバの IPアドレス
    • 許可ポート: DBで設定したポート(Default: 3306)
  4. [適用] ボタンをクリックします。

Search Engine Serviceマネージャノードの9200番ポートにアクセスできるように ACGを設定する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Compute > Server > ACGメニューを順にクリックします。
  2. ACGリストから「searchengine-m-xxxxx」を選択し、 [ACG設定] ボタンをクリックします。
  3. ACGルールを入力し、 [追加] ボタンをクリックします。
    cdss-5-6_ko
    • プロトコル: TCP
    • アクセスソース: Logstashが実行されるサーバの IPアドレス
    • 許可ポート: 9200

Logstashのインストール

Serverに Logstashをインストールするユースケースを紹介します。インストールプロセスには ElasticSearchと OpenSearchのプロセスが共に含まれています。使用するバージョンに合わせてインストールする必要があります。

STEP 1.Javaのインストール

Javaをインストールする方法は次の通りです。

  1. 以下のコマンドを入力して javaをインストールします。
yum install java-devel -y

STEP 2.Logstashのインストール

Logstashをインストールする方法は次の通りです。

参考

正常に動作するには、必ず OSSライセンスでインストールする必要があります。

  1. 以下のコマンドを入力して /root パスに Logstashをダウンロードします。
# Elasticsearchバージョンの場合(OSSバージョンをインストール)
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.7.0.rpm

# OpenSearchバージョンの場合
wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
  1. 以下のコマンドを入力してダウンロードしたファイルをインストールします。
# Elasticsearchバージョンの場合
rpm -ivh logstash-oss-7.7.0.rpm

# OpenSearchバージョンの場合
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
  1. JDBC Driverをインストールします。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.30.zip
unzip mysql-connector-java-8.0.30.zip
mkdir /etc/logstash/plugin
mv /root/mysql-connector-java-8.0.30/mysql-connector-java-8.0.30.jar /etc/logstash/plugin/ 
  1. 以下のコマンドを入力して、Logstash起動前に logstash.conf ファイルを変更します。
    • Elasticsearchバージョンの場合
      mv /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash.conf
      vi /etc/logstash/conf.d/logstash.conf
      
    • ElasticSearchバージョン logstash.conf
    input {
       jdbc {
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_driver_library => "/etc/logstash/plugin/mysql-connector-java-8.0.30.jar"
        jdbc_connection_string => "jdbc:mysql://${cdb mysql endpoint}:3306/${cdb mysql database name}?useSSL=false"
        jdbc_user => "${cdb user name}"
        jdbc_password => "${cdb mysql password}"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => true
        jdbc_page_size => 50
        statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM logstash_test WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
        record_last_run => true
        clean_run => true
        tracking_column_type => "numeric"
        tracking_column => "unix_ts_in_secs"
        use_column_value => true
        last_run_metadata_path => "/etc/logstash/data/student"
        schedule => "*/5 * * * * *"
      }
    }
    
    output {
      elasticsearch {
        hosts => ["http://${ses manager node1 ip}:9200", "http://${ses manager node2 ip}:9200"]
        index => "cdss-%{+YYYY.MM.dd}"
      }
    }
    
    • OpenSearchバージョンの場合
      # /root/にインストールする場合、{インストールパス}は/root/logstash-7.16.3です。 
      mv {インストールパス}/config/logstash-sample.conf {インストールパス}/config/logstash.conf
      vi {インストールパス}/config/logstash.conf
      
    • OpenSearchバージョン logstash.conf
      input {
         jdbc {
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_driver_library => "/etc/logstash/plugin/mysql-connector-java-8.0.30.jar"
          jdbc_connection_string => "jdbc:mysql://${cdb mysql endpoint}:3306/${cdb mysql database name}?useSSL=false"
          jdbc_user => "${cdb user name}"
          jdbc_password => "${cdb mysql password}"
          jdbc_driver_class => "com.mysql.jdbc.Driver"
          jdbc_paging_enabled => true
          jdbc_page_size => 50
          statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM logstash_test WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
          record_last_run => true
          clean_run => true
          tracking_column_type => "numeric"
          tracking_column => "unix_ts_in_secs"
          use_column_value => true
          last_run_metadata_path => "/etc/logstash/data/student"
          schedule => "*/5 * * * * *"
        }
      }
      
      output {
        opensearch {
          hosts => ["https://${ses manager node1 ip}:9200", "https://${ses manager node2 ip}:9200"]
          index => "cdss-%{+YYYY.MM.dd}"
          user => ${userID}
          password => ${password}
          ssl_certificate_verification => false
        }
      }
      
    • Logstash Conf Comment
      上記の Sample Logstash Confファイルは5秒ごとに「statement」構文が実行されます。
      「statement」構文は、最後に照会した行の update_timeよりも最近アップデートされた rowを選択するクエリです。
      ${cdb mysql endpoint} - CDB MySQLの Privateドメインに変更します。
      ${cdb mysql database name} - CDB MySQL内で自分が作成した Database名を入力します。
      ${cdb user name} - CDB MySQL内で自分が作成したアカウント名を入力します。
      ${cdb mysql password} - CDB MySQL内で自分が作成したパスワードを入力します。
      ${ses manager node1 ip} - Search Engine Serviceマネージャノードの IPアドレスを入力します。
      ${ses manager node2 ip} - Search Engine Serviceマネージャノードの IPアドレスを入力します(マネージャノードが冗長化されていない場合は入力しません)
      ${userID} - OpenSearchの場合、クラスタ作成時に入力した IDです。
      ${password} - OpenSearchの場合、クラスタ作成時に入力した passwordです。
      
    • logstash metadataパスを作成する
      mkdir /etc/logstash/data
      chown -R logstash:logstash /etc/logstash/data
      

Cloud DB for MySQL設定

Cloud DB for MySQL設定のユースケースは、次の通りです。

  • Table作成
    create table logstash_test(
    id BIGINT(20) UNSIGNED NOT NULL,
    PRIMARY KEY (id),
    contents nvarchar(255),
    create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
    );
    
  • Data入力
    insert into logstash_test(id, contents) values(1, "this is first contents");
    insert into logstash_test(id, contents) values(2, "this is second contents");
    

Logstash実行

Logstash実行のユースケースは、次の通りです。

# Elasticsearchバージョンの場合
systemctl start logstash

# OpenSearchバージョンの場合
# バックグラウンド実行のため nohupを使用します。
# -fオプションを使用して logstash.confのパスを指定する必要があります。
nohup {インストールパス}/bin/logstash -f ~{インストールパス}/config/logstash.conf &

Cloud DB for MySQLデータ照会

Search Engine Serviceで Cloud DB for MySQLのデータを照会するユースケースは、次の通りです。

GET mysql-2022.08.08/_search
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mysql-2022.08.08",
        "_type" : "_doc",
        "_id" : "VNkwe4IBicE7MyrTCKIL",
        "_score" : 1.0,
        "_source" : {
          "contents" : "this is first contents",
          "update_time" : "2022-08-08T01:55:47.000Z",
          "unix_ts_in_secs" : 1659923747,
          "id" : 1,
          "create_time" : "2022-08-08T01:55:47.000Z",
          "@version" : "1",
          "@timestamp" : "2022-08-08T02:02:01.082Z"
        }
      },
      {
        "_index" : "mysql-2022.08.08",
        "_type" : "_doc",
        "_id" : "3yEwe4IBBeKbW6yXB2l_",
        "_score" : 1.0,
        "_source" : {
          "contents" : "this is second contents",
          "update_time" : "2022-08-08T01:59:05.000Z",
          "unix_ts_in_secs" : 1659923945,
          "id" : 2,
          "create_time" : "2022-08-08T01:59:05.000Z",
          "@version" : "1",
          "@timestamp" : "2022-08-08T02:02:01.093Z"
        }
      }
    ]
  }
}