近期文章:最好的 fomepay 虚拟信用卡 推荐,解决 ChatGPT Plus Telegram会员等无法支付问题
生产项目,跨项目迁移4w压缩包,要求下载后删除压缩包内的 html,txt,url结尾的文件,并上传至 oss,但下载前需要对文件列表中的地址做切割,最后和域名做拼接去下载,删除压缩包内文件使用的 shell命令zip -d,在此记录下,上传的时候有要使用文件列表中的地址上传,但是 oss上传第一个/要去掉,使用lstrip(‘/’)去掉后上传就不报错了,
自学的 python,水平有限,总感觉写的不太理想,但我想要的功能实现了就行,后面慢慢改进吧
Python3 异步并发下载脚本
import aiohttp
import asyncio
import os
import zipfile
import subprocess
import oss2
import logging
#指定文件列表文件
file_list="zip.txt"
#指定数据保存目录
DATA_DIR = "/data/"
BASE_URL = "下载域名"
#指定最大并发数
MAX_CONCURRENT_DOWNLOADS = 8
#上传成功后是否删除本地文件
DELETE_LOCAL_FILE_AFTER_UPLOAD = True #False
#间隔几秒后重试和最大重试次数
RETRY_DELAY = 5
MAX_RETRIES = 3
#指定下载超时时间
TIMEOUT = 300
#配置下载成功与失败记录文件
SUCCESS_FILE = os.path.join(DATA_DIR, "success_log.txt")
FAILURE_FILE = os.path.join(DATA_DIR, "failure_log.txt")
# 配置oss
OSS_ACCESS_KEY_ID = ""
OSS_ACCESS_KEY_SECRET = ""
OSS_ENDPOINT = ""
OSS_BUCKET_NAME = ""
#配置日志文件
logging.basicConfig(filename='download_upload.log', level=logging.INFO)
async def extract_content_from_url(url):
#url做切割
parts = url.split('/', 4)
first_part = '/' + '/'.join(parts[1:4])
content = parts[4]
return first_part, content
async def download_from_ku(url, session, semaphore, success_file, failure_file, retries=MAX_RETRIES):
first_part, content = await extract_content_from_url(url)
if not content:
return
#构建完整的下载地址
full_url = BASE_URL + content
data_dir = DATA_DIR + os.path.dirname(content)
os.makedirs(data_dir, exist_ok=True)
local_filename = os.path.join(data_dir, os.path.basename(content))
if os.path.exists(local_filename):
logging.info(f"文件已存在: {local_filename}")
return local_filename
while retries > 0:
try:
async with semaphore, session.get(full_url, timeout=TIMEOUT) as response:
with open(local_filename, 'wb') as file:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
file.write(chunk)
print(f"下载成功: {first_part}/{content}")
logging.info(f"下载成功: {first_part}/{content}")
with open(SUCCESS_FILE, 'a') as success_log:
success_log.write(f"下载成功: {first_part}/{content}\n")
return local_filename
except asyncio.TimeoutError:
logging.warning(f"下载超时,跳过下载 {first_part}/{content}")
with open(FAILURE_FILE, 'a') as failure_log:
failure_log.write(f"下载超时: {first_part}/{content}\n")
return None
except Exception as e:
print(f"下载出错 {content}: {e}")
logging.error(f"下载出错 {content}: {e}")
with open(FAILURE_FILE, 'a') as failure_log:
failure_log.write(f"下载出错 {content}: {e}\n")
retries -= 1
if retries == 0:
logging.error(f"达到最大重试次数,放弃下载 {content}")
else:
logging.info(f"重试下载 {content}, 剩余次数: {retries}")
await asyncio.sleep(RETRY_DELAY)
async def remove_files_from_zip(zip_file, *files_to_remove):
#删除压缩包内指定类型文件,调用 zip -d
data_dir = DATA_DIR + os.path.dirname(zip_file)
zip_file_path = os.path.join(data_dir, zip_file)
command = ["zip", "-d", zip_file_path] + list(files_to_remove)
try:
subprocess.run(command)
logging.info(f"Removed files from: {zip_file_path}")
except subprocess.CalledProcessError as e:
logging.error(f"Error removing files from {zip_file_path}: {e}")
async def upload_to_oss(local_path, oss_key, retries=MAX_RETRIES):
# 将文件上传到 OSS
auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)
while retries > 0:
try:
with open(local_path, 'rb') as file:
bucket.put_object(oss_key, file)
oss_url = f"https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_key}"
print(f"上传成功: {oss_url}")
logging.info(f"上传成功: {oss_url}")
if DELETE_LOCAL_FILE_AFTER_UPLOAD:
os.remove(local_path)
break
except oss2.exceptions.OssError as e:
print(f"上传失败: {e}")
logging.error(f"上传失败: {e}")
retries -= 1
if retries == 0:
logging.error(f"达到最大重试次数,放弃上传 {local_path}")
else:
logging.info(f"重试上传 {local_path}, 剩余次数: {retries}")
await asyncio.sleep(RETRY_DELAY)
async def download_remove_upload_task(url, session, semaphore):
first_part, content = await extract_content_from_url(url)
if not content:
return
try:
local_filename = await download_from_ku(url, session, semaphore, SUCCESS_FILE, FAILURE_FILE)
if local_filename:
await remove_files_from_zip(local_filename, "*.html", "*.txt", "*.url")
first_part, content = await extract_content_from_url(url)
oss_key = (first_part.lstrip('/') + '/' + os.path.dirname(content)+ "/" + os.path.basename(local_filename)
)
await upload_to_oss(local_filename, oss_key)
except Exception as e:
print(f"处理 {url} 时出错: {e}")
logging.error(f"处理 {url} 时出错: {e}")
async def main():
with open(file_list, "r") as file:
semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS)
connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT_DOWNLOADS)
async with aiohttp.ClientSession(connector=connector) as session:
#创建下载任务列表
tasks = [
download_remove_upload_task(url.strip().strip('"'), session, semaphore)
for url in file
]
#并发执行下载移除上传任务
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
pip3 安装依赖 aiohttp asyncio oss2 subprocess
pip3 install aiohttp pip3 install asyncio pip3 install oss2 pip3 install subprocess
python3 脚本内容简单备注
前面第四个/前面的要去掉才是正确的下载地址,使用 split切割,将第一、二部分路径分别保存到一个变量里
async def extract_content_from_url(url):
#url做切割
parts = url.split('/', 4)
first_part = '/' + '/'.join(parts[1:4])
content = parts[4]
return first_part, content


构建下载删除上传任务

最后在主函数中使用asyncio.gather(*tasks)启动任务,丢到后台跑了一晚上,有个别大文件下载超时的,重新跑了一下失败的就结束了