Python3 异步并发下载删除zip压缩包内指定文件后上传压缩包到阿里云 oss

近期文章:最好的 fomepay 虚拟信用卡 推荐,解决 ChatGPT Plus Telegram会员等无法支付问题

生产项目,跨项目迁移4w压缩包,要求下载后删除压缩包内的 html,txt,url结尾的文件,并上传至 oss,但下载前需要对文件列表中的地址做切割,最后和域名做拼接去下载,删除压缩包内文件使用的 shell命令zip -d,在此记录下,上传的时候有要使用文件列表中的地址上传,但是 oss上传第一个/要去掉,使用lstrip(‘/’)去掉后上传就不报错了,

自学的 python,水平有限,总感觉写的不太理想,但我想要的功能实现了就行,后面慢慢改进吧

Python3 异步并发下载脚本

import aiohttp
import asyncio
import os
import zipfile
import subprocess
import oss2
import logging

#指定文件列表文件
file_list="zip.txt"

#指定数据保存目录
DATA_DIR = "/data/"
BASE_URL = "下载域名"

#指定最大并发数
MAX_CONCURRENT_DOWNLOADS = 8

#上传成功后是否删除本地文件
DELETE_LOCAL_FILE_AFTER_UPLOAD = True #False

#间隔几秒后重试和最大重试次数
RETRY_DELAY = 5
MAX_RETRIES = 3

#指定下载超时时间
TIMEOUT = 300



#配置下载成功与失败记录文件
SUCCESS_FILE = os.path.join(DATA_DIR, "success_log.txt")
FAILURE_FILE = os.path.join(DATA_DIR, "failure_log.txt")

# 配置oss
OSS_ACCESS_KEY_ID = ""
OSS_ACCESS_KEY_SECRET = ""
OSS_ENDPOINT = ""
OSS_BUCKET_NAME = ""

#配置日志文件
logging.basicConfig(filename='download_upload.log', level=logging.INFO)


async def extract_content_from_url(url):
    #url做切割
    parts = url.split('/', 4)
    first_part = '/' + '/'.join(parts[1:4])
    content = parts[4]
    return first_part, content


async def download_from_ku(url, session, semaphore, success_file, failure_file, retries=MAX_RETRIES):
    first_part, content = await extract_content_from_url(url)
    if not content:
        return
    
    #构建完整的下载地址
    full_url = BASE_URL + content
    data_dir = DATA_DIR + os.path.dirname(content)
    os.makedirs(data_dir, exist_ok=True)
    local_filename = os.path.join(data_dir, os.path.basename(content))

    if os.path.exists(local_filename):
        logging.info(f"文件已存在: {local_filename}")
        return local_filename

    while retries > 0:
        try:
            async with semaphore, session.get(full_url, timeout=TIMEOUT) as response:
                with open(local_filename, 'wb') as file:
                    while True:
                        chunk = await response.content.read(8192)
                        if not chunk:
                            break
                        file.write(chunk)

            print(f"下载成功: {first_part}/{content}")
            logging.info(f"下载成功: {first_part}/{content}")
            with open(SUCCESS_FILE, 'a') as success_log:
                success_log.write(f"下载成功: {first_part}/{content}\n")
            return local_filename

        except asyncio.TimeoutError:
            logging.warning(f"下载超时,跳过下载 {first_part}/{content}")
            with open(FAILURE_FILE, 'a') as failure_log:
                failure_log.write(f"下载超时: {first_part}/{content}\n")
            return None

        except Exception as e:
            print(f"下载出错 {content}: {e}")
            logging.error(f"下载出错 {content}: {e}")
            with open(FAILURE_FILE, 'a') as failure_log:
                failure_log.write(f"下载出错 {content}: {e}\n")
            retries -= 1
            if retries == 0:
                logging.error(f"达到最大重试次数,放弃下载 {content}")
            else:
                logging.info(f"重试下载 {content}, 剩余次数: {retries}")
                await asyncio.sleep(RETRY_DELAY)


async def remove_files_from_zip(zip_file, *files_to_remove):
    #删除压缩包内指定类型文件,调用 zip -d
    data_dir = DATA_DIR + os.path.dirname(zip_file)
    zip_file_path = os.path.join(data_dir, zip_file)
    command = ["zip", "-d", zip_file_path] + list(files_to_remove)
    try:
        subprocess.run(command)
        logging.info(f"Removed files from: {zip_file_path}")
    except subprocess.CalledProcessError as e:
        logging.error(f"Error removing files from {zip_file_path}: {e}")


async def upload_to_oss(local_path, oss_key, retries=MAX_RETRIES):
    # 将文件上传到 OSS
    auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
    bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)

    while retries > 0:
        try:
            with open(local_path, 'rb') as file:
                bucket.put_object(oss_key, file)

            oss_url = f"https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_key}"
            print(f"上传成功: {oss_url}")
            logging.info(f"上传成功: {oss_url}")
            if DELETE_LOCAL_FILE_AFTER_UPLOAD:
                os.remove(local_path)
            break

        except oss2.exceptions.OssError as e:
            print(f"上传失败: {e}")
            logging.error(f"上传失败: {e}")
            retries -= 1
            if retries == 0:
                logging.error(f"达到最大重试次数,放弃上传 {local_path}")
            else:
                logging.info(f"重试上传 {local_path}, 剩余次数: {retries}")
                await asyncio.sleep(RETRY_DELAY)


async def download_remove_upload_task(url, session, semaphore):
    first_part, content = await extract_content_from_url(url)
    if not content:
        return

    try:
        local_filename = await download_from_ku(url, session, semaphore, SUCCESS_FILE, FAILURE_FILE)
        if local_filename:
            await remove_files_from_zip(local_filename, "*.html", "*.txt", "*.url")
            first_part, content = await extract_content_from_url(url)
            oss_key = (first_part.lstrip('/') + '/' + os.path.dirname(content)+ "/" + os.path.basename(local_filename)
            )
            await upload_to_oss(local_filename, oss_key)
    except Exception as e:
        print(f"处理 {url} 时出错: {e}")
        logging.error(f"处理 {url} 时出错: {e}")


async def main():
    with open(file_list, "r") as file:
        semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS)
        connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT_DOWNLOADS)
        async with aiohttp.ClientSession(connector=connector) as session:
            #创建下载任务列表
            tasks = [
                download_remove_upload_task(url.strip().strip('"'), session, semaphore)
                for url in file
            ]
            #并发执行下载移除上传任务
            await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

pip3 安装依赖 aiohttp asyncio oss2 subprocess

pip3 install aiohttp 
pip3 install asyncio 
pip3 install oss2 
pip3 install subprocess

python3 脚本内容简单备注

前面第四个/前面的要去掉才是正确的下载地址,使用 split切割,将第一、二部分路径分别保存到一个变量里

async def extract_content_from_url(url):
    #url做切割
    parts = url.split('/', 4)
    first_part = '/' + '/'.join(parts[1:4])
    content = parts[4]
    return first_part, content
文件列表
download日志

构建下载删除上传任务

最后在主函数中使用asyncio.gather(*tasks)启动任务,丢到后台跑了一晚上,有个别大文件下载超时的,重新跑了一下失败的就结束了

python官方中文文档

Comments

No comments yet. Why don’t you start the discussion?

发表评论