Secret ローテーションアクション

Prev Next

VPC環境で利用できます。

Secret Managerの自動ローテーション機能を利用することで、シークレットを定期的に更新できます。シークレットローテーションタスクは、シークレットにマッピングされた Cloud Functionsのトリガーを呼び出し、トリガーに定義されたアクションを実行させることで処理されます。したがって、シークレットローテーションのためには事前に以下の準備が必要です。

  • Cloud Functions Secret Manager連携トリガー
  • Cloud Functionsアクション

Cloud Functions Secret Manager連携トリガーとアクションの設定については Cloud Functions - Secret ManagerタイプトリガーCloud Functions - Actionをご参照ください。

シークレットローテーションの開始が Secret Managerで開始されたとしても、実際のシークレットのローテーションタスクは、ユーザーが構成した Cloud Functionsの実行領域で行われます。Cloud DBのパスワードをシークレットで構成した状態でのシークレットローテーション実行構造の例は、次の通りです。

SECM_02_ko

シークレットローテーションプロセス

シークレットローテーションタスク中に Cloud Functionsアクションが実行されるステップは、実際のシークレットが適用されるターゲット(各種サービスまたはシステム)によってすべて異なる構成が可能ですが、正確なタスク追跡のために、ユーザーローテーションアクションは次の シークレットローテーションプロセス に沿って構成する必要があります。

注意

シークレットローテーションアクションの構成において、 シークレットローテーションプロセス の遵守はとても重要です。Secret Managerは、プロセスに正確に従わないアクションの実行によるエラーや誤動作については責任を負いません。

シークレットローテーションステップの追跡

シークレットローテーションが進行中の場合は、シークレットローテーションステップが正しく確認されなければ正確なシークレット値を利用することができません。以下のシークレットローテーションアクションのサンプルコードでシークレットローテーションステップを追跡するための API活用例を確認できます。

1. シークレットの照会とローテーションタスク開始の通知

ローテーションアクションが実行されると、Secret Managerでユーザーシークレット照会をリクエストして、ローテーションタスクの開始を通知します。この時点からシークレットのローテーションステータスは ローテーション進行中 に変更されます。シークレット照会とローテーションタスク開始の通知のために Start Rotation Job APIを使用します。

2. Pendingステージアップデート

更新されるシークレットを Pendingステージ状態でシークレットチェーンに登録します。この時、円滑な動作のため、必ず ローテーション対象 の値のみを更新します。新しいシークレットを作成して登録する Pythonのサンプルコードは次の通りです。自動作成 APIを使用すると、自動的にローテーション対象の値のみアップデートされます。Pendingステージアップデートのために Add Pending Stageまたは Update Pending Stage APIを使用します。

3. シークレットローテーションとテスト

新しく更新されるシークレットを実際のターゲットに適用します。ターゲットによって適用方法はすべて異なります。例えば、ターゲットが Cloud DB for MySQLの場合、ALTER USERクエリを使用します。ローテーションプロセスを完了する前に、シークレットのローテーションが正常に完了したことを確認することをお勧めします。
Naver Cloud Platform Cloud DBアクセス情報をローテーションするためには、以下のクエリをご参照ください(CDB MongoDBはパスワードアップデートクエリをサポートしません)。

DBMSタイプ パスワードアップデートクエリ
MySQL ALTER USER '{user}'@'%または IP' IDENTIFIED BY '{password}’
MSSQL ALTER LOGIN [{user}] WITH PASSWORD = '{password}';
PostgreSQL ALTER USER {user} WITH PASSWORD '{password}';

4. ローテーション結果の通知

ターゲットにシークレットローテーションが正常に完了したことを確認したら、Secret Managerでローテーションプロセス完了を通知することでローテーションタスクを終了します。これでシークレットのローテーションステータスは ローテーション終了 状態になり、Pendingステージは Activeステージになります。ローテーション成功時には Complete Rotation Jobを利用し、ローテーション失敗時には Fail Rotation Jobを利用します。

シークレットローテーションアクションのサンプルコード

シークレットローテーションプロセスコントロール APIはこちらをご参照ください。
以下は Cloud DB for MySQLでパスワード(cdbPassword)を単一の ローテーション対象 に設定した状態でローテーションを実行する Pythonアクションのサンプルコードです。

参考

Secret Manager APIを呼び出すには ACCESS KEYと SECRET KEYが必要です。これも保護が必要な値なので、コードに直接表示されないようにします。アクションのデフォルトパラメータに指定して暗号化を設定すると、安全に保護できます。詳細は Cloud Functions - 基本アクションのデフォルトパラメータ項目をご参照ください。

import hmac
import hashlib
import base64
import json
import requests
import mysql.connector
from mysql.connector import Error
import time

# Constants
ACCESS_KEY = None
SECRET_KEY = None
BASE_URL = None
JOB_TOKEN = None

def init_requests(secret_id, job_token, access_key, secret_key):
    global BASE_URL, JOB_TOKEN, API_KEY, ACCESS_KEY, SECRET_KEY
    BASE_URL = f"https://secretmanager.apigw.ntruss.com/action/v1/secrets/{secret_id}/jobs/{job_token}"
    JOB_TOKEN = job_token
    ACCESS_KEY = access_key
    SECRET_KEY = secret_key

def make_signature(method, url, timestamp):
    message = f"{method} {url}\n{timestamp}\n{ACCESS_KEY}"
    signing_key = SECRET_KEY.encode('utf-8')
    mac = hmac.new(signing_key, message.encode('utf-8'), hashlib.sha256)
    return base64.b64encode(mac.digest()).decode('utf-8')

def main(args):
    secret_id = args["secretId"]
    job_token = args["jobToken"]
    access_key = args["accessKey"]
    secret_key = args["secretKey"]

    result = {}
    skip_update_pending = False

    try:
        init_requests(secret_id, job_token, access_key, secret_key)
        print(f"[Secret Rotation Job (secret_id={secret_id})]")

        secret_value_response = start_rotation()
        print("[STEP1 COMPLETE] start rotation")

        secret_value = secret_value_response["data"]
        secret_chain = secret_value["decryptedSecretChain"]
        rotation_targets = secret_value["rotationTargets"]

        # Step 2: Generate secret value
        if "pending" not in secret_chain or secret_chain["pending"] is None:
            pending = {
                "cdbHost": json.loads(secret_chain["active"])["cdbHost"],
                "cdbPort": json.loads(secret_chain["active"])["cdbPort"],
                "cdbUser": json.loads(secret_chain["active"])["cdbUser"],
                "cdbDatabase": json.loads(secret_chain["active"])["cdbDatabase"],
                "cdbPassword": set_secret_value()
            }
            secret_chain["pending"] = json.dumps(pending)
            print("[STEP2 COMPLETE] update pending")
        else:
            skip_update_pending = True
            print("[STEP2 SIKP] already exist pending")

        change_password(secret_chain, rotation_targets)
        print("[STEP3 COMPLETE] rotate secret")

        test_password(secret_chain, rotation_targets)
        print("[STEP4 COMPLETE] test secret")

        # Step 5: Update Pending
        if skip_update_pending:
            print("[STEP5 SKIP] update pending")
        else:
            update_pending(secret_chain, rotation_targets)
            print("[STEP5 COMPLETE] update pending")

        complete_rotation()
        print("[STEP6 COMPLETE] complete rotation")

        result["done"] = True
        result["pending"] = secret_chain["pending"]

    except Exception as ex:
        print(str(ex))
        fail_rotation()
        result["done"] = False
        result["error_message"] = str(ex)

    print(json.dumps(result, indent=4))
    return result

def start_rotation():
    return execute_request("/start", {}, "POST")

def set_secret_value():
    req_body = {
        "length": 16,
        "excludeCharacters": "\"&'+/\\`",
        "requireEachIncludedType": True
    }
    response = execute_request("/generate-random-secret", req_body, "POST")
    print(json.dumps(response, indent=4))
    return response.get("randomString")

def change_password(secret_chain, rotation_targets):
    if not rotation_targets:
        raise ValueError("rotationTargets is empty")

    rotation_target = rotation_targets[0]
    active_value = json.loads(secret_chain["active"])

    cdb_host = active_value["cdbHost"]
    cdb_database = active_value["cdbDatabase"]
    cdb_user = active_value["cdbUser"]
    cdb_port = int(active_value["cdbPort"])
    print(f"cdbHost: {cdb_host}, cdbDatabase: {cdb_database}, cdbUser: {cdb_user}, cdbPort: {cdb_port}")

    if "pending" not in secret_chain:
        raise ValueError("error_message: pending is None")

    update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain)

def update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain):
    active = json.loads(secret_chain["active"])
    previous = json.loads(secret_chain.get("previous", "{}"))
    pending = json.loads(secret_chain["pending"])
    pending_password = pending[rotation_target]

    try:
        conn = try_connection(cdb_host, cdb_port, cdb_user, pending_password, cdb_database, "pending")
        if conn:
            conn.close()
            return

        active_password = active[rotation_target]
        conn = try_connection(cdb_host, cdb_port, cdb_user, active_password, cdb_database, "active")
        if conn:
            update_db_password(conn, cdb_user, pending_password)
            return

        if "previous" in secret_chain:
            previous_password = previous.get(rotation_target)
            conn = try_connection(cdb_host, cdb_port, cdb_user, previous_password, cdb_database, "previous")
            if conn:
                update_db_password(conn, cdb_user, pending_password)
            else:
                raise ValueError("All Secret Values are not valid")

    except Error as ex:
        print("[STEP3 ERROR]", ex)
        raise

def try_connection(cdb_host, cdb_port, cdb_user, password, cdb_database, password_type):
    print(f"Connecting using the {password_type} password")
    return db_connect(cdb_host, cdb_port, cdb_user, password, cdb_database)

def update_db_password(conn, cdb_user, new_password):
    query = f"ALTER USER '{cdb_user}'@'%' IDENTIFIED BY '{new_password}'"
    cursor = conn.cursor()
    cursor.execute(query)
    conn.commit()
    cursor.close()

def test_password(secret_chain, rotation_targets):
    rotation_target = rotation_targets[0]

    pending_value = json.loads(secret_chain["pending"])
    pending_password = pending_value[rotation_target]
    active_value = json.loads(secret_chain["active"])
    cdb_host = active_value["cdbHost"]
    cdb_database = active_value["cdbDatabase"]
    cdb_user = active_value["cdbUser"]
    cdb_port = int(active_value["cdbPort"])

    conn = db_connect(cdb_host, cdb_port, cdb_user, pending_password, cdb_database)
    if conn is None:
        raise ValueError("error_message: connection failed")
    conn.close()

def update_pending(secret_chain, rotation_targets):
    rotation_target = rotation_targets[0]
    
    non_change_values = json.loads(secret_chain["active"])
    cdb_host = non_change_values["cdbHost"]
    cdb_database = non_change_values["cdbDatabase"]
    cdb_user = non_change_values["cdbUser"]
    cdb_port = non_change_values["cdbPort"]

    pending = {
        "cdbHost": cdb_host,
        "cdbPort": cdb_port,
        "cdbUser": cdb_user,
        "cdbDatabase": cdb_database,
        "cdbPassword": json.loads(secret_chain["pending"])[rotation_target]
    }

    req_body = {
        "value": json.dumps(pending)
    }
    execute_request("/pending", req_body, "PUT")

def complete_rotation():
    execute_request("/complete", {}, "POST")

def fail_rotation():
    try:
        execute_request("/fail", {}, "POST")
    except Exception as ex:
        print("[FAIL ROTATION NOTIFICATION ERROR]", ex)

def execute_request(endpoint, request_body, method):
    url = BASE_URL + endpoint
    print("Request url:", url)
    timestamp = str(int(time.time() * 1000))
    headers = {
        "Content-Type": "application/json",
        "x-ncp-apigw-timestamp": timestamp,
        "x-ncp-iam-access-key": ACCESS_KEY,
        "x-ncp-apigw-signature-v2": make_signature(method, url.replace("https://secretmanager.apigw.ntruss.com", ""), timestamp),
        "SECRET-JOB-TOKEN": JOB_TOKEN
    }
    if method == "POST":
        response = requests.post(url, headers=headers, json=request_body)
    elif method == "PUT":
        response = requests.put(url, headers=headers, json=request_body)
    else:
        raise ValueError("Invalid HTTP method")

    response_body = response.json()
    print("Request Body:", request_body)
    print("Response Body:", response_body)

    if response.status_code != 200:
        raise Exception(f"Unexpected response {response.status_code}")

    return response_body

def db_connect(cdb_host, cdb_port, cdb_user, cdb_pass, cdb_database):
    try:
        connection = mysql.connector.connect(
            host=cdb_host,
            port=cdb_port,
            user=cdb_user,
            password=cdb_pass,
            database=cdb_database
        )
        print("Database connection established successfully")
        return connection
    except Error as e:
        print("Database connection failed:", e)
        return None