import requests
from concurrent.futures import ThreadPoolExecutor
import os
import logging
from tqdm import tqdm
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def download_file(url, file_path, thread_count=4):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
if file_size == 0:
raise ValueError("无法获取文件大小信息")
chunk_size = file_size // thread_count
with open(file_path, 'wb') as file:
with ThreadPoolExecutor(max_workers=thread_count) as executor:
tasks = []
progress_bar = tqdm(total=file_size, unit='iB', unit_scale=True, desc="下载进度")
for index in range(thread_count):
start_pos = index * chunk_size
end_pos = (index + 1) * chunk_size if index < thread_count - 1 else file_size
headers = {'Range': f'bytes={start_pos}-{end_pos - 1}'}
tasks.append(executor.submit(
download_chunk, url, file, start_pos, end_pos, headers, progress_bar, file_size
))
for task in tasks:
task.result()
progress_bar.close()
except requests.exceptions.RequestException as e:
logging.error(f"文件下载失败,原因:{e}")
raise
except Exception as e:
logging.error(f"发生未知错误:{e}")
raise
def download_chunk(url, file_obj, start, end, headers, progress_bar, file_size):
try:
chunk_response = requests.get(url, headers=headers, stream=True)
chunk_response.raise_for_status()
with open(file_obj.name, 'r+b') as file:
file.seek(start)
downloaded = 0
for chunk in chunk_response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
downloaded += len(chunk)
progress_bar.update(len(chunk))
except requests.exceptions.RequestException as e:
logging.error(f"分块下载失败({start}-{end}):{e}")
raise
except Exception as e:
logging.error(f"分块写入失败({start}-{end}):{e}")
raise
def main():
download_url = input("请输入下载URL:")
save_path = input("请输入文件保存路径(例如:/路径/到/文件名,记得加后缀):")
directory = os.path.dirname(save_path)
if directory:
os.makedirs(directory, exist_ok=True)
logging.info(f"已创建目录:{directory}")
if not os.path.exists(save_path):
try:
download_file(download_url, save_path)
logging.info("文件下载完成!")
except Exception as e:
logging.error(f"下载过程出错:{e}")
else:
logging.info("文件已存在,跳过下载")
if __name__ == "__main__":
main()