AI API 降本教程:哪些任务应该改成 Batch,如何把离线任务真正省下来
很多团队明明做的是离线任务,却还在用实时接口一个个同步调用,结果不仅贵,还容易被限流。只要任务不要求秒回,Batch 通常就是最应该优先考虑的降本手段之一。本文会讲清楚哪些任务适合改 Batch、怎么拆批、怎么轮询结果,并给出可复制的 Python 示例。
正文
这篇教程适合谁
适合:
- 你有大批量文本分类、摘要、改写任务
- 你要做历史数据补处理
- 你经常遇到实时接口限流
- 你已经知道有些任务可以接受几分钟到几小时延迟
不太适合:
- 每次请求都必须立刻返回给用户
- 你的业务一定要流式输出
- 你的任务量很小,实时调用也完全够用
先讲人话:什么是 Batch
Batch 的本质不是“更快”,而是:
- 异步
- 成批
- 更便宜
也就是把很多请求先打包,再交给平台慢慢处理。
这很像寄快递:
- 实时接口像同城闪送,快,但贵
- Batch 像标准物流,慢一点,但便宜很多
哪些任务特别适合改成 Batch
最典型的是这几类:
- 大量工单分类
- 历史评论情感分析
- 老数据摘要补写
- 海量 embedding
- 离线质检
- 夜间批量内容改写
一句话判断标准:
只要用户不需要立刻看到结果,这件事就值得先问一句:能不能改成 Batch?
三家主流平台现在都在鼓励你用 Batch
按当前官方文档:
- OpenAI Batch API:
50%更低成本,24 小时窗口 - Anthropic Message Batches:
50%更低成本 - Gemini Batch API:
50%更低成本,24 小时内完成
这意味着: Batch 不是“冷门技巧”,而是平台官方都明确鼓励的省钱路径。
改 Batch 前先做一个任务判断
别一上来就写代码,先判断这件事是不是真的适合批处理。
你可以直接问 4 个问题:
1. 这个任务必须秒回吗?
2. 这个任务可以晚几分钟甚至几小时给吗?
3. 任务量是不是大到实时接口明显贵或容易限流?
4. 单条任务之间是不是基本独立?
如果 4 个问题里有 3 个回答“是”,通常就很适合改 Batch。
一个非常实用的 Batch 流程
最稳的第一版流程通常是:
1. 先把任务写成 JSONL
2. 上传文件
3. 创建 batch job
4. 轮询状态
5. 下载结果文件
6. 把失败项单独重试
用 OpenAI Batch API 做一个最小可用示例
依赖安装:
pip install httpx第一步:先把请求写成 JSONL
import json
from pathlib import Path
def build_jsonl_file(rows: list[dict], output_path: str) -> Path:
path = Path(output_path)
with path.open("w", encoding="utf-8") as f:
for row in rows:
line = {
"custom_id": row["id"],
"method": "POST",
"url": "/v1/responses",
"body": {
"model": "gpt-5.4-mini",
"input": row["text"],
},
}
f.write(json.dumps(line, ensure_ascii=False) + "\n")
return path
rows = [
{"id": "ticket-1", "text": "请判断这条工单属于退款、发货还是售后咨询。内容:订单一直没发货。"},
{"id": "ticket-2", "text": "请判断这条工单属于退款、发货还是售后咨询。内容:想问什么时候退款到账。"},
]
jsonl_path = build_jsonl_file(rows, "batch_input.jsonl")
print("已生成:", jsonl_path)第二步:上传文件并创建 Batch
import os
import httpx
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
def upload_batch_file(file_path: str) -> str:
with open(file_path, "rb") as f:
files = {"file": (file_path, f, "application/jsonl")}
data = {"purpose": "batch"}
with httpx.Client(timeout=60) as client:
response = client.post(
"https://api.openai.com/v1/files",
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
data=data,
files=files,
)
response.raise_for_status()
return response.json()["id"]
def create_batch(input_file_id: str) -> str:
payload = {
"input_file_id": input_file_id,
"endpoint": "/v1/responses",
"completion_window": "24h",
}
with httpx.Client(timeout=60) as client:
response = client.post(
"https://api.openai.com/v1/batches",
headers={
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
},
json=payload,
)
response.raise_for_status()
return response.json()["id"]
file_id = upload_batch_file("batch_input.jsonl")
batch_id = create_batch(file_id)
print("batch_id =", batch_id)第三步:轮询状态
import time
def wait_batch_done(batch_id: str, poll_interval: int = 30) -> dict:
with httpx.Client(timeout=60) as client:
while True:
response = client.get(
f"https://api.openai.com/v1/batches/{batch_id}",
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
)
response.raise_for_status()
data = response.json()
status = data["status"]
print("当前状态:", status)
if status in {"completed", "failed", "expired", "cancelled"}:
return data
time.sleep(poll_interval)第四步:下载结果文件
def download_file(file_id: str, save_path: str) -> None:
with httpx.Client(timeout=120) as client:
response = client.get(
f"https://api.openai.com/v1/files/{file_id}/content",
headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
)
response.raise_for_status()
with open(save_path, "wb") as f:
f.write(response.content)
batch_result = wait_batch_done(batch_id)
output_file_id = batch_result.get("output_file_id")
if output_file_id:
download_file(output_file_id, "batch_output.jsonl")
print("结果已下载到 batch_output.jsonl")为什么这套方式值得学
因为它正好对应离线任务最常见的真实需求:
- 数据先准备好
- 批量跑
- 不盯实时响应
- 失败任务再补偿
这比你写一个 for 循环慢慢同步调用,通常更便宜,也更稳。
真正上线时一定要补的两步
1. 失败项单独重试
Batch 不代表永远 100% 成功。 你应该把失败项单独收集出来,再做二次处理。
2. 控制批大小
第一版不要一口气塞太多任务。 先从小批量开始,确认:
- 结果格式没问题
- 成本符合预期
- 错误处理逻辑完整
Anthropic 和 Gemini 怎么看
虽然上面代码用的是 OpenAI 做示例,但思路对 Anthropic 和 Gemini 一样成立:
- Anthropic Message Batches:适合大量独立消息请求
- Gemini Batch API:支持批量异步任务,官方文档也明确说明有
50%折扣
所以你真正要学会的不是某家 SDK,而是这套批处理思路。
最容易踩的 5 个坑
1. 明明是离线任务,还硬走实时接口
这是最常见的浪费。
2. 不保留 custom_id
后面你就很难把结果映射回原始数据。
3. 不拆失败项
失败项和成功项混着处理,后续很容易乱。
4. 一次塞太大批
排查问题会很痛苦。
5. 忘记比较“实时 vs Batch”的真实成本
你最好真的算一下,不要只凭感觉。
个人开发者怎么最容易见效
先挑一个最重的离线任务改 Batch:
- 老文章摘要
- 历史评论分类
- 产品描述批量改写
只要改成功一个,你就能立刻感受到差别。
小团队怎么落地
小团队最好做成两个队列:
- 实时队列:面向用户即时请求
- 夜间 Batch 队列:面向离线任务
这样职责最清楚,也最容易控制成本。
最后一句话
只要一个任务可以接受“晚一点返回”,Batch 就几乎总值得先评估一下。
很多系统真正的成本浪费,不是模型太贵,而是把本该离线的任务一直按在线方式在跑。
官方资料
- OpenAI Batch API Guide
https://developers.openai.com/api/docs/guides/batch
- OpenAI Batch API Reference
https://platform.openai.com/docs/api-reference/batch
- Anthropic Features Overview
https://platform.claude.com/docs/en/build-with-claude/overview
- Anthropic Message Batches API
https://platform.claude.com/docs/en/api/messages/batches/create
- Gemini Batch API
https://ai.google.dev/gemini-api/docs/batch-api