Logstash を活用した Cloud Data Streaming Service 連携

Prev Next

VPC環境で利用できます。

LogStashを使用して Cloud Data Streaming Serviceのデータを Search Engine Serviceに送信し、確認する方法について説明します。

事前タスク

本ガイドを実行する前にご利用の申し込みを完了する必要があるタスクは、次の通りです。

  • VPCとサーバの作成
  • Cloud Data Streaming Serviceクラスタ作成
  • Search Engine Serviceクラスタ作成

ユースケースとして、Serverに Logstashを起動した後、Kafkaのデータを Search Engine Serviceに送信する方法を扱っています。

ネットワーク設定

ネットワーク設定のユースケースを紹介します。

STEP 1.ACG設定

Cloud Data Streaming Serviceブローカーノードの9092番ポートにアクセスできるように ACGを設定する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Compute > Server > ACGメニューを順にクリックします。
  2. ACGリストから「cdss-b-xxxxx」を選択し、 [ACG設定] ボタンをクリックします。
  3. ACGルールを入力し、 [追加] ボタンをクリックします。
    cdss-5-4_ko
    • プロトコル: TCP
    • アクセスソース: Logstashが実行されるサーバの IPアドレス
    • 許可ポート: 9092
  4. [適用] ボタンをクリックします。

Search Engine Serviceマネージャノードの9200番ポートにアクセスできるように ACGを設定する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Compute > Server > ACGメニューを順にクリックします。
  2. ACGリストから「searchengine-m-xxxxx」を選択し、 [ACG設定] ボタンをクリックします。
  3. ACGルールを入力し、 [追加] ボタンをクリックします。
    cdss-5-6_ko
    • プロトコル: TCP
    • アクセスソース: Logstashが実行されるサーバの IPアドレス
    • 許可ポート: 9200

Logstashのインストール

Serverに Logstashをインストールするユースケースを紹介します。インストールプロセスには ElasticSearchと OpenSearchのプロセスが共に含まれています。使用するバージョンに合わせてインストールする必要があります。

STEP 1.Javaのインストール

  1. 以下のコマンドを入力して javaをインストールします。
yum install java-devel -y

STEP 2.Logstashのインストール

Logstashをインストールする方法は次の通りです。

  1. 以下のコマンドを入力して /root パスに Logstashをダウンロードします。
# Elasticsearchバージョンの場合(OSSバージョンをインストール)
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.7.0.rpm

# OpenSearchバージョンの場合
wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
  1. 以下のコマンドを入力して、ダウンロードしたファイルをインストールします。
# Elasticsearchバージョンの場合
rpm -ivh logstash-oss-7.7.0.rpm

# OpenSearchバージョンの場合
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
  1. 以下のコマンドを入力して、Logstash起動前に logstash.conf ファイルを変更します。
  • Elasticsearchバージョンの場合
mv /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash.conf
vi /etc/logstash/conf.d/logstash.conf
  • ElasticSearchバージョン logstash.conf
input {
 kafka {
  bootstrap_servers => "${bootstrap_servers}"
  topics => "cdss_topic"
 }
}

output {
  elasticsearch {
    hosts => ["http://${ses manager node1 ip}:9200", "http://${ses manager node2 ip}:9200"]
    index => "cdss-%{+YYYY.MM.dd}"
  }
}
  • OpenSearchバージョンの場合
# /root/にインストールする場合、{インストールパス}は/root/logstash-7.16.3です。 
mv {インストールパス}/config/logstash-sample.conf {インストールパス}/config/logstash.conf
vi {インストールパス}/config/logstash.conf
  • OpenSearchバージョン logstash.conf
input {
 kafka {
  bootstrap_servers => "${bootstrap_servers}"
  topics => "cdss_topic"
 }
}

output {
  opensearch {
    hosts => ["https://${ses manager node1 ip}:9200", "https://${ses manager node2 ip}:9200"]
    index => "cdss-%{+YYYY.MM.dd}"
    user => ${userID}
    password => ${password}
    ssl_certificate_verification => false
  }
}
  • Logstash Conf Comment
${bootstrap_servers} - Cloud Data Streaming Serviceブローカーノードの IP:Kafkaポートを入力します。  ex) 172.16.19.6:9092,172.16.19.7:9092,172.16.19.8:9092
${ses manager node1 ip} - Search Engine Serviceマネージャノードの IPアドレスを入力します。
${ses manager node2 ip} - Search Engine Serviceマネージャノードの IPアドレスを入力します(マネージャノードが冗長化されていない場合は入力しません)
${userID} - OpenSearchの場合、クラスタ作成時に入力した IDです。
${password} - OpenSearchの場合、クラスタ作成時に入力した passwordです。

Logstash実行

Logstashを実行するユースケースは、次の通りです。

# Elasticsearchバージョンの場合
systemctl start logstash

# OpenSearchバージョンの場合
# バックグラウンド実行のため nohupを使用します。
# -fオプションを使用して logstash.confのパスを指定する必要があります。
nohup {インストールパス}/bin/logstash -f ~{インストールパス}/config/logstash.conf &

CDSS連携環境構築

Logstashを実行するユースケースは、次の通りです。

  • Javaをインストールする
yum install java-devel -y
  • Kafka Binaryコードをインストール
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -zxvf kafka_2.12-2.4.0.tgz 
  • Produce
./kafka_2.12-2.4.0/bin/kafka-console-producer.sh --broker-list ${broker list} --topic cdss_topic
>this is my first message
>this is my second message

# ${broker list}にはブローカーノードの IP:Kafkaポートを入力します。ex) 172.16.19.6:9092,172.16.19.7:9092,172.16.19.8:9092

Cloud Data Streaming Serviceデータの照会

Search Engine Serviceで Cloud Data Streaming Serviceのデータを照会するユースケースは、次の通りです。

GET cdss-2022.08.08/_search
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "cdss-2022.08.08",
        "_type" : "_doc",
        "_id" : "VtmKe4IBicE7MyrTaKJ5",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "@timestamp" : "2022-08-08T03:40:44.335Z",
          "message" : "this is my first message"
        }
      },
      {
        "_index" : "cdss-2022.08.08",
        "_type" : "_doc",
        "_id" : "V9mKe4IBicE7MyrTg6IW",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "@timestamp" : "2022-08-08T03:40:51.248Z",
          "message" : "this is my second message"
        }
      }
    ]
  }
}

Kafka SSLを使用する

Cloud Data Streaming Serviceで SSLを使用する場合、証明書を追加することで設定が可能です。

  • INPUT
    input {
      kafka {
        bootstrap_servers => “{BrokerNode-HostName}:9093” 
        topics => "test“
        ssl_truststore_location => "/etc/logstash/conf.d/kafka.client.truststore.jks"
        ssl_truststore_password => “${password}"
        security_protocol => "SSL"
      }  
    }
    
    • ${BrokerNode-HostName} - クラスタリスト -> Brokerノード情報 -> 詳細を見る -> TLSで確認できます。例) "networktest12-d-251:9093,networktest12-d-252:9093,networktest12-d-253:9093"
    • ${password} - 別途の証明書パスワード