简单提要
你还在傻傻的一次次访问接口来下载图片?事实上只要稍稍利用一下python即可自动下载图片并进行去重处理。如果你想创建一个自己的图片API或者想批量下载当作桌面壁纸,这篇文章值得你阅读。这里拿光点壁纸API【https://api.lolimi.cn/API/guang/api.php?n=1&type=json】来举例。
理论上适合所有该类型的json(访问一次接口,只能返回一次图片链接),自行根据返回的json数据结构进行修改,这里不作过多的阐述,不会的可以把程序和接口数据喂AI处理。
程序特点
1.自动去重:根据图片的哈希值不同以实现去重。
2.运行间隔合理:2秒4次请求,防止被当作机器人封禁。
3.安全中断:Ctrl+C终止程序,并保留已经下载的图片(否则必须等待到程序彻底跑完才会保留图片)
4.进程保留:下次运行程序时,自动进行上次遗留的进程。
使用说明
1.Win+R打开运行窗口,输入cmd进入终端,安装requests依赖(如有则忽略),输入以下代码进行安装
pip3 install requests2.随便创建一个py程序,输入以下代码:
程序代码
import requests
import time
import os
import hashlib
import signal
import sys
from concurrent.futures import ThreadPoolExecutor
# 创建保存图片的目录
os.makedirs('katong_images', exist_ok=True)
# 存储已下载图片的哈希值
downloaded_hashes = set()
# 全局变量跟踪进度
success_count = 0
attempt_count = 0
interrupted = False
def handle_interrupt(signal, frame):
"""处理中断信号"""
global interrupted
print("\n\n检测到中断请求,正在安全终止程序...")
interrupted = True
# 保存当前进度
save_progress()
sys.exit(0)
def save_progress():
"""保存进度信息"""
with open('katong_images/progress.txt', 'w') as f:
f.write(f"成功下载: {success_count}张\n")
f.write(f"总尝试次数: {attempt_count}次\n")
f.write(f"重复图片数量: {attempt_count - success_count}张\n")
f.write(f"最后下载的文件索引: {success_count + 1}")
def download_image(index):
"""下载单个图片"""
try:
# 1. 获取图片URL
api_url = "https://api.lolimi.cn/API/guang/api.php?n=4&type=json"
response = requests.get(api_url, timeout=10)
data = response.json()
# 检查返回状态码
if data.get('code') == 1:
img_url = data['text']
# 2. 下载图片
img_response = requests.get(img_url, timeout=15)
img_data = img_response.content
# 3. 计算图片哈希值
img_hash = hashlib.md5(img_data).hexdigest()
# 4. 检查是否重复
if img_hash in downloaded_hashes:
return False, index, "重复,跳过保存"
# 5. 保存图片并记录哈希值
file_name = f"katong_images/image_{index}.jpg"
with open(file_name, 'wb') as f:
f.write(img_data)
downloaded_hashes.add(img_hash)
return True, index, f"下载成功 (哈希值: {img_hash[:8]}...)"
else:
return False, index, f"获取失败: 返回code={data.get('code')}"
except Exception as e:
return False, index, f"下载失败: {str(e)}"
def download_task():
"""执行下载任务"""
global success_count, attempt_count, interrupted
# 注册中断信号处理
signal.signal(signal.SIGINT, handle_interrupt)
# 使用线程池控制并发
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
print("开始下载图片,按 Ctrl+C 可随时终止程序")
while success_count < 200 and not interrupted:
attempt_count += 1
future = executor.submit(download_image, success_count + 1)
futures.append(future)
# 控制频率 (每2秒4次 = 每次间隔0.5秒)
time.sleep(0.5)
# 检查完成的任务
if futures:
done, _ = futures[0], futures
if done.done():
futures.pop(0)
success, idx, msg = done.result()
if success:
success_count += 1
print(f"图片 {idx}: {msg}")
# 显示进度
print(f"进度: {success_count}/200 (尝试次数: {attempt_count}){' - 中断请求已收到' if interrupted else ''}")
# 保存最终进度
save_progress()
print("\n" + "="*50)
print("所有图片下载完成!") if success_count >= 200 else print("下载已中断")
print(f"成功下载: {success_count}张, 总尝试次数: {attempt_count}次")
print(f"重复图片数量: {attempt_count - success_count}张")
print(f"所有图片保存在: {os.path.abspath('katong_images')}")
print("进度信息保存在: katong_images/progress.txt")
if __name__ == "__main__":
# 从文件中恢复进度(如果有)
progress_file = 'katong_images/progress.txt'
if os.path.exists(progress_file):
try:
with open(progress_file, 'r') as f:
lines = f.readlines()
success_count = int(lines[0].split(':')[1].strip().replace('张', ''))
print(f"检测到先前进度: 已下载 {success_count} 张图片,将从中断处继续...")
except:
print("检测到先前进度文件,但解析失败,将从头开始...")
download_task()