VPC環境で利用できます。
Secret Managerの自動ローテーション機能を利用することで、シークレットを定期的に更新できます。シークレットローテーションタスクは、シークレットにマッピングされた Cloud Functionsのトリガーを呼び出し、トリガーに定義されたアクションを実行させることで処理されます。したがって、シークレットローテーションのためには事前に以下の準備が必要です。
- Cloud Functions Secret Manager連携トリガー
- Cloud Functionsアクション
Cloud Functions Secret Manager連携トリガーとアクションの設定については Cloud Functions - Secret Managerタイプトリガーと Cloud Functions - Actionをご参照ください。
シークレットローテーションの開始が Secret Managerで開始されたとしても、実際のシークレットのローテーションタスクは、ユーザーが構成した Cloud Functionsの実行領域で行われます。Cloud DBのパスワードをシークレットで構成した状態でのシークレットローテーション実行構造の例は、次の通りです。

シークレットローテーションプロセス
シークレットローテーションタスク中に Cloud Functionsアクションが実行されるステップは、実際のシークレットが適用されるターゲット(各種サービスまたはシステム)によってすべて異なる構成が可能ですが、正確なタスク追跡のために、ユーザーローテーションアクションは次の シークレットローテーションプロセス に沿って構成する必要があります。
シークレットローテーションアクションの構成において、 シークレットローテーションプロセス の遵守はとても重要です。Secret Managerは、プロセスに正確に従わないアクションの実行によるエラーや誤動作については責任を負いません。
シークレットローテーションステップの追跡
シークレットローテーションが進行中の場合は、シークレットローテーションステップが正しく確認されなければ正確なシークレット値を利用することができません。以下のシークレットローテーションアクションのサンプルコードでシークレットローテーションステップを追跡するための API活用例を確認できます。
1. シークレットの照会とローテーションタスク開始の通知
ローテーションアクションが実行されると、Secret Managerでユーザーシークレット照会をリクエストして、ローテーションタスクの開始を通知します。この時点からシークレットのローテーションステータスは ローテーション進行中 に変更されます。シークレット照会とローテーションタスク開始の通知のために Start Rotation Job APIを使用します。
2. Pendingステージアップデート
更新されるシークレットを Pendingステージ状態でシークレットチェーンに登録します。この時、円滑な動作のため、必ず ローテーション対象 の値のみを更新します。新しいシークレットを作成して登録する Pythonのサンプルコードは次の通りです。自動作成 APIを使用すると、自動的にローテーション対象の値のみアップデートされます。Pendingステージアップデートのために Add Pending Stageまたは Update Pending Stage APIを使用します。
3. シークレットローテーションとテスト
新しく更新されるシークレットを実際のターゲットに適用します。ターゲットによって適用方法はすべて異なります。例えば、ターゲットが Cloud DB for MySQLの場合、ALTER USERクエリを使用します。ローテーションプロセスを完了する前に、シークレットのローテーションが正常に完了したことを確認することをお勧めします。
Naver Cloud Platform Cloud DBアクセス情報をローテーションするためには、以下のクエリをご参照ください(CDB MongoDBはパスワードアップデートクエリをサポートしません)。
| DBMSタイプ | パスワードアップデートクエリ |
|---|---|
| MySQL | ALTER USER '{user}'@'%または IP' IDENTIFIED BY '{password}’ |
| MSSQL | ALTER LOGIN [{user}] WITH PASSWORD = '{password}'; |
| PostgreSQL | ALTER USER {user} WITH PASSWORD '{password}'; |
4. ローテーション結果の通知
ターゲットにシークレットローテーションが正常に完了したことを確認したら、Secret Managerでローテーションプロセス完了を通知することでローテーションタスクを終了します。これでシークレットのローテーションステータスは ローテーション終了 状態になり、Pendingステージは Activeステージになります。ローテーション成功時には Complete Rotation Jobを利用し、ローテーション失敗時には Fail Rotation Jobを利用します。
シークレットローテーションアクションのサンプルコード
シークレットローテーションプロセスコントロール APIはこちらをご参照ください。
以下は Cloud DB for MySQLでパスワード(cdbPassword)を単一の ローテーション対象 に設定した状態でローテーションを実行する Pythonアクションのサンプルコードです。
Secret Manager APIを呼び出すには ACCESS KEYと SECRET KEYが必要です。これも保護が必要な値なので、コードに直接表示されないようにします。アクションのデフォルトパラメータに指定して暗号化を設定すると、安全に保護できます。詳細は Cloud Functions - 基本アクションのデフォルトパラメータ項目をご参照ください。
import hmac
import hashlib
import base64
import json
import requests
import mysql.connector
from mysql.connector import Error
import time
# Constants
ACCESS_KEY = None
SECRET_KEY = None
BASE_URL = None
JOB_TOKEN = None
def init_requests(secret_id, job_token, access_key, secret_key):
global BASE_URL, JOB_TOKEN, API_KEY, ACCESS_KEY, SECRET_KEY
BASE_URL = f"https://secretmanager.apigw.ntruss.com/action/v1/secrets/{secret_id}/jobs/{job_token}"
JOB_TOKEN = job_token
ACCESS_KEY = access_key
SECRET_KEY = secret_key
def make_signature(method, url, timestamp):
message = f"{method} {url}\n{timestamp}\n{ACCESS_KEY}"
signing_key = SECRET_KEY.encode('utf-8')
mac = hmac.new(signing_key, message.encode('utf-8'), hashlib.sha256)
return base64.b64encode(mac.digest()).decode('utf-8')
def main(args):
secret_id = args["secretId"]
job_token = args["jobToken"]
access_key = args["accessKey"]
secret_key = args["secretKey"]
result = {}
skip_update_pending = False
try:
init_requests(secret_id, job_token, access_key, secret_key)
print(f"[Secret Rotation Job (secret_id={secret_id})]")
secret_value_response = start_rotation()
print("[STEP1 COMPLETE] start rotation")
secret_value = secret_value_response["data"]
secret_chain = secret_value["decryptedSecretChain"]
rotation_targets = secret_value["rotationTargets"]
# Step 2: Generate secret value
if "pending" not in secret_chain or secret_chain["pending"] is None:
pending = {
"cdbHost": json.loads(secret_chain["active"])["cdbHost"],
"cdbPort": json.loads(secret_chain["active"])["cdbPort"],
"cdbUser": json.loads(secret_chain["active"])["cdbUser"],
"cdbDatabase": json.loads(secret_chain["active"])["cdbDatabase"],
"cdbPassword": set_secret_value()
}
secret_chain["pending"] = json.dumps(pending)
print("[STEP2 COMPLETE] update pending")
else:
skip_update_pending = True
print("[STEP2 SIKP] already exist pending")
change_password(secret_chain, rotation_targets)
print("[STEP3 COMPLETE] rotate secret")
test_password(secret_chain, rotation_targets)
print("[STEP4 COMPLETE] test secret")
# Step 5: Update Pending
if skip_update_pending:
print("[STEP5 SKIP] update pending")
else:
update_pending(secret_chain, rotation_targets)
print("[STEP5 COMPLETE] update pending")
complete_rotation()
print("[STEP6 COMPLETE] complete rotation")
result["done"] = True
result["pending"] = secret_chain["pending"]
except Exception as ex:
print(str(ex))
fail_rotation()
result["done"] = False
result["error_message"] = str(ex)
print(json.dumps(result, indent=4))
return result
def start_rotation():
return execute_request("/start", {}, "POST")
def set_secret_value():
req_body = {
"length": 16,
"excludeCharacters": "\"&'+/\\`",
"requireEachIncludedType": True
}
response = execute_request("/generate-random-secret", req_body, "POST")
print(json.dumps(response, indent=4))
return response.get("randomString")
def change_password(secret_chain, rotation_targets):
if not rotation_targets:
raise ValueError("rotationTargets is empty")
rotation_target = rotation_targets[0]
active_value = json.loads(secret_chain["active"])
cdb_host = active_value["cdbHost"]
cdb_database = active_value["cdbDatabase"]
cdb_user = active_value["cdbUser"]
cdb_port = int(active_value["cdbPort"])
print(f"cdbHost: {cdb_host}, cdbDatabase: {cdb_database}, cdbUser: {cdb_user}, cdbPort: {cdb_port}")
if "pending" not in secret_chain:
raise ValueError("error_message: pending is None")
update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain)
def update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain):
active = json.loads(secret_chain["active"])
previous = json.loads(secret_chain.get("previous", "{}"))
pending = json.loads(secret_chain["pending"])
pending_password = pending[rotation_target]
try:
conn = try_connection(cdb_host, cdb_port, cdb_user, pending_password, cdb_database, "pending")
if conn:
conn.close()
return
active_password = active[rotation_target]
conn = try_connection(cdb_host, cdb_port, cdb_user, active_password, cdb_database, "active")
if conn:
update_db_password(conn, cdb_user, pending_password)
return
if "previous" in secret_chain:
previous_password = previous.get(rotation_target)
conn = try_connection(cdb_host, cdb_port, cdb_user, previous_password, cdb_database, "previous")
if conn:
update_db_password(conn, cdb_user, pending_password)
else:
raise ValueError("All Secret Values are not valid")
except Error as ex:
print("[STEP3 ERROR]", ex)
raise
def try_connection(cdb_host, cdb_port, cdb_user, password, cdb_database, password_type):
print(f"Connecting using the {password_type} password")
return db_connect(cdb_host, cdb_port, cdb_user, password, cdb_database)
def update_db_password(conn, cdb_user, new_password):
query = f"ALTER USER '{cdb_user}'@'%' IDENTIFIED BY '{new_password}'"
cursor = conn.cursor()
cursor.execute(query)
conn.commit()
cursor.close()
def test_password(secret_chain, rotation_targets):
rotation_target = rotation_targets[0]
pending_value = json.loads(secret_chain["pending"])
pending_password = pending_value[rotation_target]
active_value = json.loads(secret_chain["active"])
cdb_host = active_value["cdbHost"]
cdb_database = active_value["cdbDatabase"]
cdb_user = active_value["cdbUser"]
cdb_port = int(active_value["cdbPort"])
conn = db_connect(cdb_host, cdb_port, cdb_user, pending_password, cdb_database)
if conn is None:
raise ValueError("error_message: connection failed")
conn.close()
def update_pending(secret_chain, rotation_targets):
rotation_target = rotation_targets[0]
non_change_values = json.loads(secret_chain["active"])
cdb_host = non_change_values["cdbHost"]
cdb_database = non_change_values["cdbDatabase"]
cdb_user = non_change_values["cdbUser"]
cdb_port = non_change_values["cdbPort"]
pending = {
"cdbHost": cdb_host,
"cdbPort": cdb_port,
"cdbUser": cdb_user,
"cdbDatabase": cdb_database,
"cdbPassword": json.loads(secret_chain["pending"])[rotation_target]
}
req_body = {
"value": json.dumps(pending)
}
execute_request("/pending", req_body, "PUT")
def complete_rotation():
execute_request("/complete", {}, "POST")
def fail_rotation():
try:
execute_request("/fail", {}, "POST")
except Exception as ex:
print("[FAIL ROTATION NOTIFICATION ERROR]", ex)
def execute_request(endpoint, request_body, method):
url = BASE_URL + endpoint
print("Request url:", url)
timestamp = str(int(time.time() * 1000))
headers = {
"Content-Type": "application/json",
"x-ncp-apigw-timestamp": timestamp,
"x-ncp-iam-access-key": ACCESS_KEY,
"x-ncp-apigw-signature-v2": make_signature(method, url.replace("https://secretmanager.apigw.ntruss.com", ""), timestamp),
"SECRET-JOB-TOKEN": JOB_TOKEN
}
if method == "POST":
response = requests.post(url, headers=headers, json=request_body)
elif method == "PUT":
response = requests.put(url, headers=headers, json=request_body)
else:
raise ValueError("Invalid HTTP method")
response_body = response.json()
print("Request Body:", request_body)
print("Response Body:", response_body)
if response.status_code != 200:
raise Exception(f"Unexpected response {response.status_code}")
return response_body
def db_connect(cdb_host, cdb_port, cdb_user, cdb_pass, cdb_database):
try:
connection = mysql.connector.connect(
host=cdb_host,
port=cdb_port,
user=cdb_user,
password=cdb_pass,
database=cdb_database
)
print("Database connection established successfully")
return connection
except Error as e:
print("Database connection failed:", e)
return None