专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

测试用例:多线程图片ETL Oracle和S3

ins518 2024-11-10 10:47:08 技术文章 9 ℃ 0 评论
# 导入cx_Oracle模块,用于连接oracle数据库
import cx_Oracle
# 导入boto3模块,用于操作s3
import boto3
# 导入threading模块,用于创建多线程
import threading
# 定义数据库连接参数,根据你的实际情况修改
user = "username" # 用户名
password = "password" # 密码
dsn = "localhost:1521/xe" # 数据源名称
# 创建数据库连接池对象,设置最小连接数为2,最大连接数为5,增量为1
pool = cx_Oracle.SessionPool(user, password, dsn, min=2, max=5, increment=1)
# 定义s3连接参数,根据你的实际情况修改
s3 = boto3.resource(
's3',
aws_access_key_id="gQ4+YC530sBa8qZI6WcbUbtH8oar0exampleuniqueID", # s3访问密钥ID
aws_secret_access_key="7fa22331ebe62bf4605dc9a42aaeexampleuniqueID", # s3访问密钥
region_name="us-phoenix-1", # s3区域名称
endpoint_url="^4^" # s3端点URL
)
# 定义s3存储桶名称,根据你的实际情况修改
bucket_name = "mybucket"
# 定义一个函数,用于从数据库中读取blob字段,并上传到s3存储桶
def upload_blob(filename, blob):
# 从连接池中获取一个连接对象
conn = pool.acquire()
# 将blob内容转换为字节流对象
blob_data = io.BytesIO(blob.read())
# 上传字节流对象到s3存储桶
s3.Object(bucket_name, filename).put(Body=blob_data)
# 释放连接对象回到连接池
pool.release(conn)
# 定义一个函数,用于分页查询数据库中的blob字段,并返回查询结果
def query_blob(page_number, page_size, result_dict):
# 创建一个游标对象,从连接池中获取一个连接对象
cur = pool.acquire().cursor()
# 定义sql语句,根据你的实际情况修改,使用row_number()函数和where子句实现分页查询
sql = """select filename, filecontent from (
select filename, filecontent, row_number() over (order by filename) rn from table1)
where rn between :start and :end"""
# 计算分页查询的起始和结束行号
start = (page_number - 1) * page_size + 1
end = page_number * page_size
# 执行sql语句,并传入绑定变量的值
cur.execute(sql, start=start, end=end)
# 获取查询结果,并存入字典中,以页数为键
result_dict[page_number] = cur.fetchall()
# 关闭游标对象
cur.close()
# 定义总页数和每页记录数,根据你的实际情况修改或动态获取[^3^][3]
total_pages = 10
page_size = 100
# 创建一个空列表,用于存储查询线程对象
query_threads = []
# 创建一个空字典,用于存储查询结果
result_dict = {}
# 创建一个空列表,用于存储上传线程对象
upload_threads = []
# 遍历所有页数,并创建一个查询线程对象,传入查询函数和参数
for page_number in range(1, total_pages + 1):
# 创建一个查询线程对象
t = threading.Thread(target=query_blob, args=(page_number, page_size, result_dict))
# 启动查询线程
t.start()
# 将查询线程对象添加到列表中
query_threads.append(t)
# 等待所有查询线程结束
for t in query_threads:
t.join()
# 从字典中获取查询结果,并按页数排序
results = []
for page_number in sorted(result_dict.keys()):
results += result_dict[page_number]
# 遍历查询结果,并创建一个上传线程对象,传入上传函数和参数
for row in results:
# 获取文件名和blob内容
filename = row[0]
blob = row[1]
# 创建一个上传线程对象
t = threading.Thread(target=upload_blob, args=(filename, blob))
# 启动上传线程
t.start()
# 将上传线程对象添加到列表中
upload_threads.append(t)
# 等待所有上传线程结束
for t in upload_threads:
t.join()
# 关闭连接池对象
pool.close()

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表