VPC 환경에서 이용 가능합니다.
Secret Manager의 자동 교체 기능을 이용하면 시크릿을 주기적으로 갱신할 수 있습니다. 시크릿 교체 작업은 시크릿에 매핑된 Cloud Functions 트리거를 호출하여 트리거에 정의된 액션을 실행시킴으로써 처리됩니다. 따라서 시크릿 교체를 위해서는 사전에 아래 준비 사항이 필요합니다.
- Cloud Functions Secret Manager 연동 트리거
- Cloud Functions 액션
Cloud Functions Secret Manager 연동 트리거와 액션의 설정에 대한 자세한 내용은 Cloud Functions - Secret Manager 타입 트리거 와 Cloud Functions - Action을 참고해 주시기 바랍니다.
시크릿 교체의 시작이 Secret Manager에서 시작된다고 하더라도, 실제 시크릿의 교체 작업은 사용자가 구성한 Cloud Functions 실행 영역에서 수행됩니다. Cloud DB의 패스워드를 시크릿으로 구성한 상태에서 시크릿 교체 실행 구조 예시는 다음과 같습니다.

시크릿 교체 프로세스
시크릿 교체 작업 중 Cloud Functions 액션이 실행되는 단계는 실제 시크릿이 적용될 타겟(각종 서비스 또는 시스템)에 따라 모두 다르게 구성될 수 있지만, 정확한 작업 추적을 위해서 사용자 교체 액션은 다음의 시크릿 교체 프로세스 를 따라 구성되어야 합니다.
시크릿 교체 액션의 구성에 있어 시크릿 교체 프로세스 준수는 매우 중요합니다. Secret Manager는 프로세스를 정확히 따르지 않은 액션의 실행으로 인한 오류 또는 오동작에 대해서는 책임을 지지 않습니다.
시크릿 교체 단계 추적
시크릿 교체가 진행 중인 경우에는 시크릿 교체 단계가 올바르게 확인되어야 정확한 시크릿 값을 이용할 수 있습니다. 아래 시크릿 교체 액션 예제 코드에서 시크릿 교체 단계 추적을 위한 API 활용 예시를 확인할 수 있습니다.
1. 시크릿 조회 및 교체 작업 시작 통보
교체 액션이 실행되면, Secret Manager로 사용자 시크릿 조회를 요청하여 교체 작업의 시작을 알리게 됩니다. 이 시점부터 시크릿의 교체 상태는 교체 진행중으로 변경됩니다. 시크릿 조회 및 교체 작업 시작 통보를 위해 Start Rotation Job API를 사용합니다.
2. Pending 스테이지 업데이트
갱신될 시크릿을 Pending 스테이지 상태로 시크릿 체인에 등록합니다. 이때, 원활한 동작을 위해 반드시 교체 대상 값만 업데이트 합니다. 새로운 시크릿을 생성하여 등록하는 파이썬 예시 코드는 다음과 같습니다. 자동 생성 API를 사용하면 자동으로 교체 대상 값만 업데이트 됩니다. Pending 스테이지 업데이트를 위해 Add Pending Stage 또는 Update Pending Stage API를 사용합니다.
3. 시크릿 교체 및 테스트
새로 갱신될 시크릿을 실제 타겟에 적용합니다. 타겟에 따라 적용 방식은 모두 다릅니다. 예를 들어, 타겟이 Cloud DB for MySQL 이라면, ALTER USER 쿼리가 사용됩니다. 교체 프로세스를 완료하기 전에 정상적으로 시크릿 교체가 완료됐는지 확인하는 것이 좋습니다.
만약 Naver Cloud Platform Cloud DB 접속 정보를 교체하기 위해서는 아래 쿼리를 참고해주시기 바랍니다. (CDB MongoDB는 패스워드 업데이트 쿼리가 지원되지 않습니다.)
| DBMS 타입 | 패스워드 업데이트 쿼리 |
|---|---|
| MySQL | ALTER USER '{user}'@'% 또는 IP' IDENTIFIED BY '{password}' |
| MSSQL | ALTER LOGIN [{user}] WITH PASSWORD = '{password}'; |
| PostgreSQL | ALTER USER {user} WITH PASSWORD '{password}'; |
4. 교체 결과 통보
타겟에 시크릿 교체가 정상적으로 완료되었는지 확인했다면, Secret Manager로 교체 프로세스 완료 통보를 함으로써 교체 작업을 종료합니다. 이제 시크릿의 교체 상태는 교체 종료 상태가 되며, Pending 스테이지는 Active 스테이지가 됩니다. 교체 성공시에는 Complete Rotation Job 을 이용하고, 교체 실패시에는 Fail Rotation Job을 이용합니다.
시크릿 교체 액션 예제 코드
시크릿 교체 프로세스 컨트롤 API는 여기를 참고해 주시기 바랍니다.
다음은 Cloud DB for MySQL에서 패스워드(cdbPassword)를 단일 교체 대상으로 설정한 상태에서 교체를 실행하는 파이썬 액션 예시 코드입니다.
Secret Manager API를 호출하기 위해서는 ACCESS KEY와 SECRET KEY가 필요합니다. 이 역시 보호가 필요한 값이기 때문에 코드에 직접 노출 되지 않아야 합니다. 액션의 디폴트 파라미터로 지정하고 암호화를 설정하면 안전하게 보호할 수 있습니다. 자세한 내용은 Cloud Functions - 기본 액션의 디폴트 파라미터 항목을 참고해 주시기 바랍니다.
import hmac
import hashlib
import base64
import json
import requests
import mysql.connector
from mysql.connector import Error
import time
# Constants
ACCESS_KEY = None
SECRET_KEY = None
BASE_URL = None
JOB_TOKEN = None
def init_requests(secret_id, job_token, access_key, secret_key):
global BASE_URL, JOB_TOKEN, API_KEY, ACCESS_KEY, SECRET_KEY
BASE_URL = f"https://secretmanager.apigw.ntruss.com/action/v1/secrets/{secret_id}/jobs/{job_token}"
JOB_TOKEN = job_token
ACCESS_KEY = access_key
SECRET_KEY = secret_key
def make_signature(method, url, timestamp):
message = f"{method} {url}\n{timestamp}\n{ACCESS_KEY}"
signing_key = SECRET_KEY.encode('utf-8')
mac = hmac.new(signing_key, message.encode('utf-8'), hashlib.sha256)
return base64.b64encode(mac.digest()).decode('utf-8')
def main(args):
secret_id = args["secretId"]
job_token = args["jobToken"]
access_key = args["accessKey"]
secret_key = args["secretKey"]
result = {}
skip_update_pending = False
try:
init_requests(secret_id, job_token, access_key, secret_key)
print(f"[Secret Rotation Job (secret_id={secret_id})]")
secret_value_response = start_rotation()
print("[STEP1 COMPLETE] start rotation")
secret_value = secret_value_response["data"]
secret_chain = secret_value["decryptedSecretChain"]
rotation_targets = secret_value["rotationTargets"]
# Step 2: Generate secret value
if "pending" not in secret_chain or secret_chain["pending"] is None:
pending = {
"cdbHost": json.loads(secret_chain["active"])["cdbHost"],
"cdbPort": json.loads(secret_chain["active"])["cdbPort"],
"cdbUser": json.loads(secret_chain["active"])["cdbUser"],
"cdbDatabase": json.loads(secret_chain["active"])["cdbDatabase"],
"cdbPassword": set_secret_value()
}
secret_chain["pending"] = json.dumps(pending)
print("[STEP2 COMPLETE] update pending")
else:
skip_update_pending = True
print("[STEP2 SIKP] already exist pending")
change_password(secret_chain, rotation_targets)
print("[STEP3 COMPLETE] rotate secret")
test_password(secret_chain, rotation_targets)
print("[STEP4 COMPLETE] test secret")
# Step 5: Update Pending
if skip_update_pending:
print("[STEP5 SKIP] update pending")
else:
update_pending(secret_chain, rotation_targets)
print("[STEP5 COMPLETE] update pending")
complete_rotation()
print("[STEP6 COMPLETE] complete rotation")
result["done"] = True
result["pending"] = secret_chain["pending"]
except Exception as ex:
print(str(ex))
fail_rotation()
result["done"] = False
result["error_message"] = str(ex)
print(json.dumps(result, indent=4))
return result
def start_rotation():
return execute_request("/start", {}, "POST")
def set_secret_value():
req_body = {
"length": 16,
"excludeCharacters": "\"&'+/\\`",
"requireEachIncludedType": True
}
response = execute_request("/generate-random-secret", req_body, "POST")
print(json.dumps(response, indent=4))
return response.get("randomString")
def change_password(secret_chain, rotation_targets):
if not rotation_targets:
raise ValueError("rotationTargets is empty")
rotation_target = rotation_targets[0]
active_value = json.loads(secret_chain["active"])
cdb_host = active_value["cdbHost"]
cdb_database = active_value["cdbDatabase"]
cdb_user = active_value["cdbUser"]
cdb_port = int(active_value["cdbPort"])
print(f"cdbHost: {cdb_host}, cdbDatabase: {cdb_database}, cdbUser: {cdb_user}, cdbPort: {cdb_port}")
if "pending" not in secret_chain:
raise ValueError("error_message: pending is None")
update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain)
def update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain):
active = json.loads(secret_chain["active"])
previous = json.loads(secret_chain.get("previous", "{}"))
pending = json.loads(secret_chain["pending"])
pending_password = pending[rotation_target]
try:
conn = try_connection(cdb_host, cdb_port, cdb_user, pending_password, cdb_database, "pending")
if conn:
conn.close()
return
active_password = active[rotation_target]
conn = try_connection(cdb_host, cdb_port, cdb_user, active_password, cdb_database, "active")
if conn:
update_db_password(conn, cdb_user, pending_password)
return
if "previous" in secret_chain:
previous_password = previous.get(rotation_target)
conn = try_connection(cdb_host, cdb_port, cdb_user, previous_password, cdb_database, "previous")
if conn:
update_db_password(conn, cdb_user, pending_password)
else:
raise ValueError("All Secret Values are not valid")
except Error as ex:
print("[STEP3 ERROR]", ex)
raise
def try_connection(cdb_host, cdb_port, cdb_user, password, cdb_database, password_type):
print(f"Connecting using the {password_type} password")
return db_connect(cdb_host, cdb_port, cdb_user, password, cdb_database)
def update_db_password(conn, cdb_user, new_password):
query = f"ALTER USER '{cdb_user}'@'%' IDENTIFIED BY '{new_password}'"
cursor = conn.cursor()
cursor.execute(query)
conn.commit()
cursor.close()
def test_password(secret_chain, rotation_targets):
rotation_target = rotation_targets[0]
pending_value = json.loads(secret_chain["pending"])
pending_password = pending_value[rotation_target]
active_value = json.loads(secret_chain["active"])
cdb_host = active_value["cdbHost"]
cdb_database = active_value["cdbDatabase"]
cdb_user = active_value["cdbUser"]
cdb_port = int(active_value["cdbPort"])
conn = db_connect(cdb_host, cdb_port, cdb_user, pending_password, cdb_database)
if conn is None:
raise ValueError("error_message: connection failed")
conn.close()
def update_pending(secret_chain, rotation_targets):
rotation_target = rotation_targets[0]
non_change_values = json.loads(secret_chain["active"])
cdb_host = non_change_values["cdbHost"]
cdb_database = non_change_values["cdbDatabase"]
cdb_user = non_change_values["cdbUser"]
cdb_port = non_change_values["cdbPort"]
pending = {
"cdbHost": cdb_host,
"cdbPort": cdb_port,
"cdbUser": cdb_user,
"cdbDatabase": cdb_database,
"cdbPassword": json.loads(secret_chain["pending"])[rotation_target]
}
req_body = {
"value": json.dumps(pending)
}
execute_request("/pending", req_body, "PUT")
def complete_rotation():
execute_request("/complete", {}, "POST")
def fail_rotation():
try:
execute_request("/fail", {}, "POST")
except Exception as ex:
print("[FAIL ROTATION NOTIFICATION ERROR]", ex)
def execute_request(endpoint, request_body, method):
url = BASE_URL + endpoint
print("Request url:", url)
timestamp = str(int(time.time() * 1000))
headers = {
"Content-Type": "application/json",
"x-ncp-apigw-timestamp": timestamp,
"x-ncp-iam-access-key": ACCESS_KEY,
"x-ncp-apigw-signature-v2": make_signature(method, url.replace("https://secretmanager.apigw.ntruss.com", ""), timestamp),
"SECRET-JOB-TOKEN": JOB_TOKEN
}
if method == "POST":
response = requests.post(url, headers=headers, json=request_body)
elif method == "PUT":
response = requests.put(url, headers=headers, json=request_body)
else:
raise ValueError("Invalid HTTP method")
response_body = response.json()
print("Request Body:", request_body)
print("Response Body:", response_body)
if response.status_code != 200:
raise Exception(f"Unexpected response {response.status_code}")
return response_body
def db_connect(cdb_host, cdb_port, cdb_user, cdb_pass, cdb_database):
try:
connection = mysql.connector.connect(
host=cdb_host,
port=cdb_port,
user=cdb_user,
password=cdb_pass,
database=cdb_database
)
print("Database connection established successfully")
return connection
except Error as e:
print("Database connection failed:", e)
return None