Logstashを活用した Cloud Data Streaming Service連携
- 印刷する
- PDF
Logstashを活用した Cloud Data Streaming Service連携
- 印刷する
- PDF
Article Summary
Share feedback
Thanks for sharing your feedback!
VPC環境で利用できます。
LogStashを使用して Cloud Data Streaming Serviceのデータを Search Engine Serviceに転送して確認する方法を説明します。
事前作業
本ガイドを実行する前にご利用の申し込みを完了する作業は、次の通りです。
- VPCおよびサーバ作成
- Cloud Data Streaming Serviceクラスタを作成する
- Search Engine Serviceクラスタ作成
ユースケースは Serverに Logstashを起動した後、Kafkaのデータを Search Engine Serviceに転送する方法を扱っています。
ネットワーク設定
ネットワーク設定のユースケースを紹介します。
STEP 1. ACG設定
Cloud Data Streaming Service ブローカーノードの9092番ポートでアクセスできるように ACGを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールで、Services > Compute > Server > ACG メニューを順にクリックします。
- ACGリストで 'cdss-b-xxxxx'を選択した後、[ACG設定] ボタンをクリックします。
- ACGルールを入力し、[追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Logstashが実行したサーバの IPアドレス
- 許可ポート: 9092
- [適用] ボタンをクリックします。
Search Engine Serviceマネージャノードの9200番ポートでアクセスできるように ACGを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールで、 Services > Compute > Server > ACG メニューを順にクリックします。
- ACGリストで「searchengine-m-xxxxx」を選択し、[ACG設定] ボタンをクリックします。
- ACGルールを入力し、[追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Logstashが実行するサーバの IPアドレス
- 許可ポート: 9200
Logstashインストール
Serverに Logstashをインストールするユースケースを紹介します。インストール過程には、ElasticSearchと OpenSearchの過程が含まれています。使用するバージョンに合わせてインストールすることで正常にテストできます。
STEP 1. Javaインストール
- 以下のコマンドを入力してを javaをインストールします。
yum install java-devel -y
STEP 2. Logstashインストール
Logstashをインストールする方法は次の通りです。
- 以下のコマンドを入力して
/root
経路に Logstashをダウンロードします。
# Elasticsearchバージョンの場合(OSSバージョンインストール)
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.7.0.rpm
# OpenSearchバージョンの場合
wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
- 次のコマンドを入力してダウンロードしたファイルをインストールします。
# Elasticsearchバージョンの場合
rpm -ivh logstash-oss-7.7.0.rpm
# OpenSearchバージョンの場合
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
- 以下のコマンドを入力して Logstash起動前に
logstash.conf
ファイルを修正します。
- Elasticsearchバージョンの場合
mv /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash.conf
vi /etc/logstash/conf.d/logstash.conf
- ElasticSearchバージョン logstash.conf
input {
kafka {
bootstrap_servers => "${bootstrap_servers}"
topics => "cdss_topic"
}
}
output {
elasticsearch {
hosts => ["http://${ses manager node1 ip}:9200", "http://${ses manager node2 ip}:9200"]
index => "cdss-%{+YYYY.MM.dd}"
}
}
- OpenSearchバージョンの場合
# /root/にインストールする場合{インストールパス}は/root/logstash-7.16.3です
mv {インストールパス}/config/logstash-sample.conf {インストールパス}/config/logstash.conf
vi {インストールパス}/config/logstash.conf
- OpenSearch バージョン logstash.conf
input {
kafka {
bootstrap_servers => "${bootstrap_servers}"
topics => "cdss_topic"
}
}
output {
opensearch {
hosts => ["https://${ses manager node1 ip}:9200", "https://${ses manager node2 ip}:9200"]
index => "cdss-%{+YYYY.MM.dd}"
user => ${userID}
password => ${password}
ssl_certificate_verification => false
}
}
- Logstash Conf Comment
${bootstrap_servers} - Cloud Data Streaming Serviceブローカーノードの IP:Kafka portを入力します。 ex) 172.16.19.6:9092,172.16.19.7:9092,172.16.19.8:9092
${ses manager node1 ip} - Search Engine Serviceマネージャーノードの IPアドレスを入力します。
${ses manager node2 ip} - Search Engine Serviceマネージャーノードの IPアドレスを入力します(マネージャーノードが二重化されていない場合は入力しません。)。
${userID} - OpenSearchの場合、クラスタ作成時に入力した IDです。
${password} - OpenSearchの場合、クラスタ作成時に入力した passwordです。
Logstash実行
Logstashを実行するユースケースは次の通りです。
# Elasticsearchバージョンの場合
systemctl start logstash
# OpenSearchバージョンの場合
# バックグラウンドを行うため、 nohupを使用します。
# -fオプションで logstash.confのパスを指定してください。
nohup {インストールパス}/bin/logstash -f ~{インストールパス}/config/logstash.conf &
CDSS連携環境構築
Logstashを実行するユースケースは次の通りです。
- Javaをインストールする
yum install java-devel -y
- Kafka Binaryコードのインストール
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -zxvf kafka_2.12-2.4.0.tgz
- Produce
./kafka_2.12-2.4.0/bin/kafka-console-producer.sh --broker-list ${broker list} --topic cdss_topic
>this is my first message
>this is my second message
# ${broker list} はブローカーノードの IP:Kafkaポートを入力します。 ex) 172.16.19.6:9092,172.16.19.7:9092,172.16.19.8:9092
Cloud Data Streaming Serviceデータ照会
Search Engine Serviceで Cloud Data Streaming Serviceのデータを照会するユースケースは次のとおりです。
GET cdss-2022.08.08/_search
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "cdss-2022.08.08",
"_type" : "_doc",
"_id" : "VtmKe4IBicE7MyrTaKJ5",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"@timestamp" : "2022-08-08T03:40:44.335Z",
"message" : "this is my first message"
}
},
{
"_index" : "cdss-2022.08.08",
"_type" : "_doc",
"_id" : "V9mKe4IBicE7MyrTg6IW",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"@timestamp" : "2022-08-08T03:40:51.248Z",
"message" : "this is my second message"
}
}
]
}
}
Kafka SSL使用
Cloud Data Streaming Serviceで SSLを使用する場合、認証キーを追加して設定をします。
INPUT
input { kafka { bootstrap_servers => “{BrokerNode-HostName}:9093” topics => "test“ ssl_truststore_location => "/etc/logstash/conf.d/kafka.client.truststore.jks" ssl_truststore_password => “${password}" security_protocol => "SSL" } }
${BrokerNode-HostName}
- クラスタリスト -> Brokerノード情報 -> 詳細を表示 -> TLSで確認します。例)"networktest12-d-251:9093,networktest12-d-252:9093,networktest12-d-253:9093"
${password}
- 別途の認証キーパスワード
この記事は役に立ちましたか?