RH
RouterHub AI API中转站哪家好?2026最新免费稳定大模型接口平台推荐
博客

AI API 降本教程:哪些任务应该改成 Batch,如何把离线任务真正省下来

很多团队明明做的是离线任务,却还在用实时接口一个个同步调用,结果不仅贵,还容易被限流。只要任务不要求秒回,Batch 通常就是最应该优先考虑的降本手段之一。本文会讲清楚哪些任务适合改 Batch、怎么拆批、怎么轮询结果,并给出可复制的 Python 示例。

muchacha1@163.com 2026-04-05 10:57

正文

这篇教程适合谁

适合:

  • 你有大批量文本分类、摘要、改写任务
  • 你要做历史数据补处理
  • 你经常遇到实时接口限流
  • 你已经知道有些任务可以接受几分钟到几小时延迟

不太适合:

  • 每次请求都必须立刻返回给用户
  • 你的业务一定要流式输出
  • 你的任务量很小,实时调用也完全够用

先讲人话:什么是 Batch

Batch 的本质不是“更快”,而是:

  • 异步
  • 成批
  • 更便宜

也就是把很多请求先打包,再交给平台慢慢处理。

这很像寄快递:

  • 实时接口像同城闪送,快,但贵
  • Batch 像标准物流,慢一点,但便宜很多

哪些任务特别适合改成 Batch

最典型的是这几类:

  • 大量工单分类
  • 历史评论情感分析
  • 老数据摘要补写
  • 海量 embedding
  • 离线质检
  • 夜间批量内容改写

一句话判断标准:

只要用户不需要立刻看到结果,这件事就值得先问一句:能不能改成 Batch?

三家主流平台现在都在鼓励你用 Batch

按当前官方文档:

  • OpenAI Batch API:50% 更低成本,24 小时窗口
  • Anthropic Message Batches:50% 更低成本
  • Gemini Batch API:50% 更低成本,24 小时内完成

这意味着: Batch 不是“冷门技巧”,而是平台官方都明确鼓励的省钱路径。

改 Batch 前先做一个任务判断

别一上来就写代码,先判断这件事是不是真的适合批处理。

你可以直接问 4 个问题:

1. 这个任务必须秒回吗?

2. 这个任务可以晚几分钟甚至几小时给吗?

3. 任务量是不是大到实时接口明显贵或容易限流?

4. 单条任务之间是不是基本独立?

如果 4 个问题里有 3 个回答“是”,通常就很适合改 Batch。

一个非常实用的 Batch 流程

最稳的第一版流程通常是:

1. 先把任务写成 JSONL

2. 上传文件

3. 创建 batch job

4. 轮询状态

5. 下载结果文件

6. 把失败项单独重试

用 OpenAI Batch API 做一个最小可用示例

依赖安装:

pip install httpx

第一步:先把请求写成 JSONL

import json
from pathlib import Path


def build_jsonl_file(rows: list[dict], output_path: str) -> Path:
    path = Path(output_path)
    with path.open("w", encoding="utf-8") as f:
        for row in rows:
            line = {
                "custom_id": row["id"],
                "method": "POST",
                "url": "/v1/responses",
                "body": {
                    "model": "gpt-5.4-mini",
                    "input": row["text"],
                },
            }
            f.write(json.dumps(line, ensure_ascii=False) + "\n")
    return path


rows = [
    {"id": "ticket-1", "text": "请判断这条工单属于退款、发货还是售后咨询。内容:订单一直没发货。"},
    {"id": "ticket-2", "text": "请判断这条工单属于退款、发货还是售后咨询。内容:想问什么时候退款到账。"},
]

jsonl_path = build_jsonl_file(rows, "batch_input.jsonl")
print("已生成:", jsonl_path)

第二步:上传文件并创建 Batch

import os
import httpx


OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")


def upload_batch_file(file_path: str) -> str:
    with open(file_path, "rb") as f:
        files = {"file": (file_path, f, "application/jsonl")}
        data = {"purpose": "batch"}
        with httpx.Client(timeout=60) as client:
            response = client.post(
                "https://api.openai.com/v1/files",
                headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
                data=data,
                files=files,
            )
            response.raise_for_status()
            return response.json()["id"]


def create_batch(input_file_id: str) -> str:
    payload = {
        "input_file_id": input_file_id,
        "endpoint": "/v1/responses",
        "completion_window": "24h",
    }
    with httpx.Client(timeout=60) as client:
        response = client.post(
            "https://api.openai.com/v1/batches",
            headers={
                "Authorization": f"Bearer {OPENAI_API_KEY}",
                "Content-Type": "application/json",
            },
            json=payload,
        )
        response.raise_for_status()
        return response.json()["id"]


file_id = upload_batch_file("batch_input.jsonl")
batch_id = create_batch(file_id)
print("batch_id =", batch_id)

第三步:轮询状态

import time


def wait_batch_done(batch_id: str, poll_interval: int = 30) -> dict:
    with httpx.Client(timeout=60) as client:
        while True:
            response = client.get(
                f"https://api.openai.com/v1/batches/{batch_id}",
                headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
            )
            response.raise_for_status()
            data = response.json()
            status = data["status"]
            print("当前状态:", status)

            if status in {"completed", "failed", "expired", "cancelled"}:
                return data

            time.sleep(poll_interval)

第四步:下载结果文件

def download_file(file_id: str, save_path: str) -> None:
    with httpx.Client(timeout=120) as client:
        response = client.get(
            f"https://api.openai.com/v1/files/{file_id}/content",
            headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
        )
        response.raise_for_status()
        with open(save_path, "wb") as f:
            f.write(response.content)


batch_result = wait_batch_done(batch_id)
output_file_id = batch_result.get("output_file_id")

if output_file_id:
    download_file(output_file_id, "batch_output.jsonl")
    print("结果已下载到 batch_output.jsonl")

为什么这套方式值得学

因为它正好对应离线任务最常见的真实需求:

  • 数据先准备好
  • 批量跑
  • 不盯实时响应
  • 失败任务再补偿

这比你写一个 for 循环慢慢同步调用,通常更便宜,也更稳。

真正上线时一定要补的两步

1. 失败项单独重试

Batch 不代表永远 100% 成功。 你应该把失败项单独收集出来,再做二次处理。

2. 控制批大小

第一版不要一口气塞太多任务。 先从小批量开始,确认:

  • 结果格式没问题
  • 成本符合预期
  • 错误处理逻辑完整

Anthropic 和 Gemini 怎么看

虽然上面代码用的是 OpenAI 做示例,但思路对 Anthropic 和 Gemini 一样成立:

  • Anthropic Message Batches:适合大量独立消息请求
  • Gemini Batch API:支持批量异步任务,官方文档也明确说明有 50% 折扣

所以你真正要学会的不是某家 SDK,而是这套批处理思路。

最容易踩的 5 个坑

1. 明明是离线任务,还硬走实时接口

这是最常见的浪费。

2. 不保留 custom_id

后面你就很难把结果映射回原始数据。

3. 不拆失败项

失败项和成功项混着处理,后续很容易乱。

4. 一次塞太大批

排查问题会很痛苦。

5. 忘记比较“实时 vs Batch”的真实成本

你最好真的算一下,不要只凭感觉。

个人开发者怎么最容易见效

先挑一个最重的离线任务改 Batch:

  • 老文章摘要
  • 历史评论分类
  • 产品描述批量改写

只要改成功一个,你就能立刻感受到差别。

小团队怎么落地

小团队最好做成两个队列:

  • 实时队列:面向用户即时请求
  • 夜间 Batch 队列:面向离线任务

这样职责最清楚,也最容易控制成本。

最后一句话

只要一个任务可以接受“晚一点返回”,Batch 就几乎总值得先评估一下。

很多系统真正的成本浪费,不是模型太贵,而是把本该离线的任务一直按在线方式在跑。

官方资料

  • OpenAI Batch API Guide

https://developers.openai.com/api/docs/guides/batch

  • OpenAI Batch API Reference

https://platform.openai.com/docs/api-reference/batch

  • Anthropic Features Overview

https://platform.claude.com/docs/en/build-with-claude/overview

  • Anthropic Message Batches API

https://platform.claude.com/docs/en/api/messages/batches/create

  • Gemini Batch API

https://ai.google.dev/gemini-api/docs/batch-api