Python用AWS SDK(Boto3)
  • PDF

Python用AWS SDK(Boto3)

  • PDF

可在Classic/VPC环境下使用。

Python用SDK for S3 API

本文件将通过示例介绍基于AWS S3提供的Python用SDK使用NAVER Cloud Platform Object Storage的方法。 本文件以AWS Python SDK 1.6.19版本为准编写。

安装SDK

pip install boto3==1.6.19

示例

参考

在示例中使用的accessKey和secretKey值中,必须输入已注册的API认证密钥信息。

创建存储桶

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)

    bucket_name = 'sample-bucket'

    s3.create_bucket(Bucket=bucket_name)

查询存储桶列表

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)

    response = s3.list_buckets()
    
    for bucket in response.get('Buckets', []):
        print bucket.get('Name')

删除存储桶

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)

    bucket_name = 'sample-bucket'

    s3.delete_bucket(Bucket=bucket_name)

上传文件

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)

    bucket_name = 'sample-bucket'

    # create folder
    object_name = 'sample-folder/'

    s3.put_object(Bucket=bucket_name, Key=object_name)

    # upload file
    object_name = 'sample-object'
    local_file_path = '/tmp/test.txt'

    s3.upload_file(local_file_path, bucket_name, object_name)

查询文件列表

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)

    bucket_name = 'sample-bucket'

    # list all in the bucket
    max_keys = 300
    response = s3.list_objects(Bucket=bucket_name, MaxKeys=max_keys)

    print('list all in the bucket')

    while True:
        print('IsTruncated=%r' % response.get('IsTruncated'))
        print('Marker=%s' % response.get('Marker'))
        print('NextMarker=%s' % response.get('NextMarker'))

        print('Object List')
        for content in response.get('Contents'):
            print(' Name=%s, Size=%d, Owner=%s' % \
                  (content.get('Key'), content.get('Size'), content.get('Owner').get('ID')))

        if response.get('IsTruncated'):
            response = s3.list_objects(Bucket=bucket_name, MaxKeys=max_keys,
                                       Marker=response.get('NextMarker'))
        else:
            break

    # top level folders and files in the bucket
    delimiter = '/'
    max_keys = 300

    response = s3.list_objects(Bucket=bucket_name, Delimiter=delimiter, MaxKeys=max_keys)

    print('top level folders and files in the bucket')

    while True:
        print('IsTruncated=%r' % response.get('IsTruncated'))
        print('Marker=%s' % response.get('Marker'))
        print('NextMarker=%s' % response.get('NextMarker'))

        print('Folder List')
        for folder in response.get('CommonPrefixes'):
            print(' Name=%s' % folder.get('Prefix'))

        print('File List')
        for content in response.get('Contents'):
            print(' Name=%s, Size=%d, Owner=%s' % \
                  (content.get('Key'), content.get('Size'), content.get('Owner').get('ID')))

        if response.get('IsTruncated'):
            response = s3.list_objects(Bucket=bucket_name, Delimiter=delimiter, MaxKeys=max_keys,
                                       Marker=response.get('NextMarker'))
        else:
            break

下载文件

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)
    bucket_name = 'sample-bucket'

    object_name = 'sample-object'
    local_file_path = '/tmp/test.txt'

    s3.download_file(bucket_name, object_name, local_file_path)

删除文件

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)
    
    bucket_name = 'sample-bucket'
    object_name = 'sample-object'

    s3.delete_object(Bucket=bucket_name, Key=object_name)

ACL设置

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)
    bucket_name = 'sample-bucket'

    # set bucket ACL
    # add read permission to anonymous
    s3.put_bucket_acl(Bucket=bucket_name, ACL='public-read')

    response = s3.get_bucket_acl(Bucket=bucket_name)

    # set object ACL
    # add read permission to user by ID
    object_name = 'sample-object'
    owner_id = 'test-owner-id'
    target_id = 'test-user-id'

    s3.put_object_acl(Bucket=bucket_name, Key=object_name,
                      AccessControlPolicy={
                          'Grants': [
                              {
                                  'Grantee': {
                                      'ID': owner_id,
                                      'Type': 'CanonicalUser'
                                  },
                                  'Permission': 'FULL_CONTROL'
                              },
                              {
                                  'Grantee': {
                                      'ID': target_id,
                                      'Type': 'CanonicalUser'
                                  },
                                  'Permission': 'READ'
                              }
                          ],
                          'Owner': {
                              'ID': owner_id
                          }
                      })

    response = s3.get_object_acl(Bucket=bucket_name, Key=object_name)

分段上传

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY_ID'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)
    bucket_name = 'sample-bucket'
    object_name = 'sample-large-object'
    
    local_file = '/tmp/sample.file'

    # initialize and get upload ID
    create_multipart_upload_response = s3.create_multipart_upload(Bucket=bucket_name, Key=object_name)
    upload_id = create_multipart_upload_response['UploadId']

    part_size = 10 * 1024 * 1024
    parts = []

    # upload parts
    with open(local_file, 'rb') as f:
        part_number = 1
        while True:
            data = f.read(part_size)
            if not len(data):
                break
            upload_part_response = s3.upload_part(Bucket=bucket_name, Key=object_name, PartNumber=part_number, UploadId=upload_id, Body=data)
            parts.append({
                'PartNumber': part_number,
                'ETag': upload_part_response['ETag']
            })
            part_number += 1

    multipart_upload = {'Parts': parts}

    # abort multipart upload
    # s3.abort_multipart_upload(Bucket=bucket_name, Key=object_name, UploadId=upload_id)

    # complete multipart upload
    s3.complete_multipart_upload(Bucket=bucket_name, Key=object_name, UploadId=upload_id, MultipartUpload=multipart_upload)

请求用客户提供的密钥进行加密(SSE-C)

参考

基于SSE-C加密对象时将无法使用控制台中的部分请求。

import boto3
import secrets

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY_ID'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    # S3 client
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)

    # create encryption key
    sse_key = secrets.token_bytes(32)
    sse_conf = { "SSECustomerKey": sse_key, "SSECustomerAlgorithm": "AES256"}

    bucket_name = 'sample-bucket'
    object_name = 'sample-object'
    
    # upload object
    local_file_path = '/tmp/sample.txt'
    s3.upload_file(local_file_path, bucket_name, object_name, sse_conf)

    # download object
    download_file_path = '/tmp/sample-download.txt'
    s3.download_file(bucket_name, object_name, download_file_path, sse_conf)

CORS(Cross-Origin Resource Sharing)设置

import boto3

service_name = 's3'
endpoint_url = 'https://kr.object.ncloudstorage.com'
region_name = 'kr-standard'
access_key = 'ACCESS_KEY_ID'
secret_key = 'SECRET_KEY'

if __name__ == "__main__":
    s3 = boto3.client(service_name, endpoint_url=endpoint_url, aws_access_key_id=access_key,
                      aws_secret_access_key=secret_key)
                      
    bucket_name = 'sample-bucket'

# Define the configuration rules
    cors_configuration = {
        'CORSRules': [{
            'AllowedHeaders': ['*'],
            'AllowedMethods': ['GET', 'PUT'],
            'AllowedOrigins': ['*'],
            'MaxAgeSeconds': 3000
        }]
    }

    # Set CORS configuration
    s3.put_bucket_cors(Bucket=bucket_name,
                    CORSConfiguration=cors_configuration)

    # Get CORS configuration
    response = s3.get_bucket_cors(Bucket=bucket_name)
    print(response['CORSRules'])

本文是否有帮助