Secret rotation action

Prev Next

Available in VPC

You can renew secrets periodically with Secret Manager’s automatic rotation function. Secret rotation involves calling the Cloud Functions trigger mapped to the secret and executing the action defined in the trigger. Therefore, the following preparations should be in place in advance to rotate secrets:

  • Cloud Functions Secret Manager connected trigger
  • Cloud Functions action

See Cloud Functions - Secret Manager type trigger and Cloud Functions - Action for details about setting the Cloud Functions Secret Manager connected triggers and actions.

Although the start of the secret rotation begins in Secret Manager, the actual secret rotation will occur in the Cloud Functions execution area that the user configured. Here is an example of the execution structure of secret rotation while having configured Cloud DB’s password as a secret.

SECM_02_ko

Secret rotation process

Although the stage where Cloud Functions action is executed during secret rotation may be configured differently depending on the actual target (various services or systems) to which the secret is to be applied, the user rotation action must be configured in accordance with the following secret rotation process for accurate action tracking.

Caution

When it comes to the configuration of secret rotation action, it is very important to comply with the secret rotation process. Secret Manager does not take responsibility for any error or malfunction caused by executing any action that does not comply with the process accurately.

Track secret rotation stages

While secret rotation is in progress, each secret rotation stage must be checked correctly to use the accurate secret value. You can find the example of using API to track each secret rotation stage from the Example codes of secret rotation action below.

1. View secrets and notify the start of rotation

As the rotation is executed, make a request for viewing user secrets in Secret Manager to notify the start of the rotation. From this point, the rotation status of a secret will turn to Rotation in progress. Use the Start Rotation Job API to view secrets and notify the start of the rotation.

2. Update pending stage

Register the secret to be rotated to the secret chain in the state of pending stage. At this point, make sure to update only the target values for rotation for smooth operation. Here is the Python example code that creates and registers a new secret. When you use an automatically generated API, only the target values for rotation will be updated automatically. Use the Add Pending Stage or Update Pending Stage API to update the pending stage.

3. Rotate and test secret

Apply the secret to be renewed to the actual target. The application method may differ depending on the target. For example, if the target is Cloud DB for MySQL, ALTER USER query is used. It is recommended to check if the secret rotation is properly completed before finishing the rotation process.
If you need to rotate NAVER Cloud Platform Cloud DB access information, refer to the following query: (CDB MongoDB does not support the password update query.)

DBMS type Password update query
MySQL ALTER USER '{user}'@'% or IP' IDENTIFIED BY '{password}'
MSSQL ALTER LOGIN [{user}] WITH PASSWORD = '{password}';
PostgreSQL ALTER USER {user} WITH PASSWORD '{password}';

4. Notify rotation result

Once you have checked if the secret rotation to the target has been completed without any problem, notify the Secret Manager of the Secret Manager of the completion of the rotation process to finish the rotation task. Now the rotation status of the secret turns to Rotation completed, and the pending stage turns to the active stage. Use Complete Rotation Job when the rotation succeeded, and use Fail Rotation Job when the rotation failed.

Example codes of secret rotation action

Refer to HERE for the secret rotation process control API.
The following is the example code of Python action that executes the rotation while setting the password (cdbPassword) as the single rotation target in Cloud DB for MySQL:

Note

You need the access key and secret key to call the Secret Manager API. As this value also needs to be protected, you must not expose it to the code directly. You can protect it safely by setting it as the default parameter of the action and setting it to be encrypted. See the default parameter item in Cloud Functions - Basic action for details.

import hmac
import hashlib
import base64
import json
import requests
import mysql.connector
from mysql.connector import Error
import time

# Constants
ACCESS_KEY = None
SECRET_KEY = None
BASE_URL = None
JOB_TOKEN = None

def init_requests(secret_id, job_token, access_key, secret_key):
    global BASE_URL, JOB_TOKEN, API_KEY, ACCESS_KEY, SECRET_KEY
    BASE_URL = f"https://secretmanager.apigw.ntruss.com/action/v1/secrets/{secret_id}/jobs/{job_token}"
    JOB_TOKEN = job_token
    ACCESS_KEY = access_key
    SECRET_KEY = secret_key

def make_signature(method, url, timestamp):
    message = f"{method} {url}\n{timestamp}\n{ACCESS_KEY}"
    signing_key = SECRET_KEY.encode('utf-8')
    mac = hmac.new(signing_key, message.encode('utf-8'), hashlib.sha256)
    return base64.b64encode(mac.digest()).decode('utf-8')

def main(args):
    secret_id = args["secretId"]
    job_token = args["jobToken"]
    access_key = args["accessKey"]
    secret_key = args["secretKey"]

    result = {}
    skip_update_pending = False

    try:
        init_requests(secret_id, job_token, access_key, secret_key)
        print(f"[Secret Rotation Job (secret_id={secret_id})]")

        secret_value_response = start_rotation()
        print("[STEP1 COMPLETE] start rotation")

        secret_value = secret_value_response["data"]
        secret_chain = secret_value["decryptedSecretChain"]
        rotation_targets = secret_value["rotationTargets"]

        # Step 2: Generate secret value
        if "pending" not in secret_chain or secret_chain["pending"] is None:
            pending = {
                "cdbHost": json.loads(secret_chain["active"])["cdbHost"],
                "cdbPort": json.loads(secret_chain["active"])["cdbPort"],
                "cdbUser": json.loads(secret_chain["active"])["cdbUser"],
                "cdbDatabase": json.loads(secret_chain["active"])["cdbDatabase"],
                "cdbPassword": set_secret_value()
            }
            secret_chain["pending"] = json.dumps(pending)
            print("[STEP2 COMPLETE] update pending")
        else:
            skip_update_pending = True
            print("[STEP2 SIKP] already exist pending")

        change_password(secret_chain, rotation_targets)
        print("[STEP3 COMPLETE] rotate secret")

        test_password(secret_chain, rotation_targets)
        print("[STEP4 COMPLETE] test secret")

        # Step 5: Update Pending
        if skip_update_pending:
            print("[STEP5 SKIP] update pending")
        else:
            update_pending(secret_chain, rotation_targets)
            print("[STEP5 COMPLETE] update pending")

        complete_rotation()
        print("[STEP6 COMPLETE] complete rotation")

        result["done"] = True
        result["pending"] = secret_chain["pending"]

    except Exception as ex:
        print(str(ex))
        fail_rotation()
        result["done"] = False
        result["error_message"] = str(ex)

    print(json.dumps(result, indent=4))
    return result

def start_rotation():
    return execute_request("/start", {}, "POST")

def set_secret_value():
    req_body = {
        "length": 16,
        "excludeCharacters": "\"&'+/\\`",
        "requireEachIncludedType": True
    }
    response = execute_request("/generate-random-secret", req_body, "POST")
    print(json.dumps(response, indent=4))
    return response.get("randomString")

def change_password(secret_chain, rotation_targets):
    if not rotation_targets:
        raise ValueError("rotationTargets is empty")

    rotation_target = rotation_targets[0]
    active_value = json.loads(secret_chain["active"])

    cdb_host = active_value["cdbHost"]
    cdb_database = active_value["cdbDatabase"]
    cdb_user = active_value["cdbUser"]
    cdb_port = int(active_value["cdbPort"])
    print(f"cdbHost: {cdb_host}, cdbDatabase: {cdb_database}, cdbUser: {cdb_user}, cdbPort: {cdb_port}")

    if "pending" not in secret_chain:
        raise ValueError("error_message: pending is None")

    update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain)

def update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain):
    active = json.loads(secret_chain["active"])
    previous = json.loads(secret_chain.get("previous", "{}"))
    pending = json.loads(secret_chain["pending"])
    pending_password = pending[rotation_target]

    try:
        conn = try_connection(cdb_host, cdb_port, cdb_user, pending_password, cdb_database, "pending")
        if conn:
            conn.close()
            return

        active_password = active[rotation_target]
        conn = try_connection(cdb_host, cdb_port, cdb_user, active_password, cdb_database, "active")
        if conn:
            update_db_password(conn, cdb_user, pending_password)
            return

        if "previous" in secret_chain:
            previous_password = previous.get(rotation_target)
            conn = try_connection(cdb_host, cdb_port, cdb_user, previous_password, cdb_database, "previous")
            if conn:
                update_db_password(conn, cdb_user, pending_password)
            else:
                raise ValueError("All Secret Values are not valid")

    except Error as ex:
        print("[STEP3 ERROR]", ex)
        raise

def try_connection(cdb_host, cdb_port, cdb_user, password, cdb_database, password_type):
    print(f"Connecting using the {password_type} password")
    return db_connect(cdb_host, cdb_port, cdb_user, password, cdb_database)

def update_db_password(conn, cdb_user, new_password):
    query = f"ALTER USER '{cdb_user}'@'%' IDENTIFIED BY '{new_password}'"
    cursor = conn.cursor()
    cursor.execute(query)
    conn.commit()
    cursor.close()

def test_password(secret_chain, rotation_targets):
    rotation_target = rotation_targets[0]

    pending_value = json.loads(secret_chain["pending"])
    pending_password = pending_value[rotation_target]
    active_value = json.loads(secret_chain["active"])
    cdb_host = active_value["cdbHost"]
    cdb_database = active_value["cdbDatabase"]
    cdb_user = active_value["cdbUser"]
    cdb_port = int(active_value["cdbPort"])

    conn = db_connect(cdb_host, cdb_port, cdb_user, pending_password, cdb_database)
    if conn is None:
        raise ValueError("error_message: connection failed")
    conn.close()

def update_pending(secret_chain, rotation_targets):
    rotation_target = rotation_targets[0]
    
    non_change_values = json.loads(secret_chain["active"])
    cdb_host = non_change_values["cdbHost"]
    cdb_database = non_change_values["cdbDatabase"]
    cdb_user = non_change_values["cdbUser"]
    cdb_port = non_change_values["cdbPort"]

    pending = {
        "cdbHost": cdb_host,
        "cdbPort": cdb_port,
        "cdbUser": cdb_user,
        "cdbDatabase": cdb_database,
        "cdbPassword": json.loads(secret_chain["pending"])[rotation_target]
    }

    req_body = {
        "value": json.dumps(pending)
    }
    execute_request("/pending", req_body, "PUT")

def complete_rotation():
    execute_request("/complete", {}, "POST")

def fail_rotation():
    try:
        execute_request("/fail", {}, "POST")
    except Exception as ex:
        print("[FAIL ROTATION NOTIFICATION ERROR]", ex)

def execute_request(endpoint, request_body, method):
    url = BASE_URL + endpoint
    print("Request url:", url)
    timestamp = str(int(time.time() * 1000))
    headers = {
        "Content-Type": "application/json",
        "x-ncp-apigw-timestamp": timestamp,
        "x-ncp-iam-access-key": ACCESS_KEY,
        "x-ncp-apigw-signature-v2": make_signature(method, url.replace("https://secretmanager.apigw.ntruss.com", ""), timestamp),
        "SECRET-JOB-TOKEN": JOB_TOKEN
    }
    if method == "POST":
        response = requests.post(url, headers=headers, json=request_body)
    elif method == "PUT":
        response = requests.put(url, headers=headers, json=request_body)
    else:
        raise ValueError("Invalid HTTP method")

    response_body = response.json()
    print("Request Body:", request_body)
    print("Response Body:", response_body)

    if response.status_code != 200:
        raise Exception(f"Unexpected response {response.status_code}")

    return response_body

def db_connect(cdb_host, cdb_port, cdb_user, cdb_pass, cdb_database):
    try:
        connection = mysql.connector.connect(
            host=cdb_host,
            port=cdb_port,
            user=cdb_user,
            password=cdb_pass,
            database=cdb_database
        )
        print("Database connection established successfully")
        return connection
    except Error as e:
        print("Database connection failed:", e)
        return None