0%


一、CrewAI 是什么?

官方定义

CrewAI 是一个精简、快速的 Python 框架,用于编排自主 AI 代理(Agent)。它让多个 AI Agent 可以协同工作,像一个团队一样完成复杂任务。

核心特点

特点 说明
独立框架 不依赖 LangChain 或其他框架,从零构建
高性能 比 LangGraph 快 5 倍以上
灵活定制 从高层工作流到底层提示词都可定制
两种模式 Crews(自主协作)+ Flows(精确控制)

与其他框架对比

框架 优点 缺点
CrewAI 简洁、快速、独立 社区相对较新
LangGraph 功能强大 样板代码多、与 LangChain 强耦合
Autogen 对话代理能力强 缺乏流程概念
ChatDev 有流程概念 定制能力有限

适用场景

  • 自动化工作流(报告生成、数据分析)
  • 多角色协作系统(一人公司、虚拟团队)
  • 复杂任务分解与执行
  • AI 驱动的应用开发

二、核心概念

2.1 Agent(代理)

Agent 是 CrewAI 的核心单元,代表一个”角色”。

1
2
3
4
5
6
7
8
9
10
11
from crewai import Agent

agent = Agent(
role="高级数据分析师", # 角色
goal="分析数据并提供洞察", # 目标
backstory="你是一位资深分析师...", # 背景故事
verbose=True, # 详细输出
allow_delegation=True, # 是否可以委派任务
tools=[], # 可用工具
llm=..., # 使用的大模型
)

关键参数

  • role:角色名称,定义 Agent 的身份
  • goal:目标,定义 Agent 想要达成什么
  • backstory:背景故事,决定 Agent 的性格和行为方式
  • tools:Agent 可以使用的工具列表
  • llm:Agent 使用的大模型

2.2 Task(任务)

Task 定义 Agent 需要完成的具体工作。

1
2
3
4
5
6
7
8
from crewai import Task

task = Task(
description="分析销售数据,找出增长趋势",
expected_output="包含 5 个关键发现的报告",
agent=agent, # 执行任务的 Agent
output_file="report.md", # 输出文件
)

2.3 Crew(团队)

Crew 是 Agent 的集合,定义它们如何协作。

1
2
3
4
5
6
7
8
from crewai import Crew, Process

crew = Crew(
agents=[agent1, agent2, agent3],
tasks=[task1, task2, task3],
process=Process.sequential, # 或 Process.hierarchical
verbose=True,
)

协作模式

模式 说明 适用场景
Process.sequential 顺序执行,一个任务完成后执行下一个 流程明确的任务
Process.hierarchical 层级模式,自动分配 Manager Agent 协调 需要动态决策的任务

2.4 Tools(工具)

Tools 扩展 Agent 的能力,让它可以搜索、读写文件、调用 API 等。

1
2
3
4
5
6
7
8
9
10
from crewai_tools import SerperDevTool, FileReadTool

search_tool = SerperDevTool() # 搜索工具
file_tool = FileReadTool() # 文件读取工具

agent = Agent(
role="研究员",
tools=[search_tool, file_tool],
...
)

2.5 整体工作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
┌─────────────────────────────────────────────────────────────┐
│ Crew │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Agent 1 │ │ Agent 2 │ │ Agent 3 │ │
│ │ (分析师) │ │ (研究员) │ │ (撰稿人) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Task 1 │ │ Task 2 │ │ Task 3 │ │
│ │ 分析数据 │→│ 研究背景 │→│ 撰写报告 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘

三、安装与环境配置

3.1 环境要求

  • Python 3.10 - 3.13
  • 推荐使用 uv 作为包管理器

3.2 安装 CrewAI

方式一:使用 pip

1
2
3
4
5
# 基础安装
pip install crewai

# 安装额外工具
pip install 'crewai[tools]'

方式二:使用 uv(推荐)

1
2
3
4
5
6
# 安装 uv
pip install uv

# 安装 CrewAI
uv pip install crewai
uv pip install 'crewai[tools]'

3.3 创建项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 创建新项目
crewai create crew my_project

# 项目结构
my_project/
├── .env # 环境变量
├── pyproject.toml # 项目配置
└── src/my_project/
├── main.py # 入口文件
├── crew.py # Crew 定义
├── tools/ # 自定义工具
└── config/
├── agents.yaml # Agent 配置
└── tasks.yaml # Task 配置

3.4 常见安装问题

问题 1:ModuleNotFoundError: tiktoken

1
uv pip install 'crewai[embeddings]'

问题 2:Windows 上 Rust 编译错误

1
2
3
# 安装 Visual C++ Build Tools
# 或使用预编译包
uv pip install tiktoken --prefer-binary

四、大模型服务接入(重点)

这是大多数新手卡住的地方。CrewAI 默认使用 OpenAI,但你必须配置正确的 base_url 才能连接到你的大模型服务。

4.1 核心概念

为什么需要 base_url?

1
2
3
4
┌─────────────┐         ┌─────────────────────┐
│ CrewAI │ ────→ │ LLM 服务端点 │
│ (你的代码) │ HTTP │ (base_url) │
└─────────────┘ └─────────────────────┘
  • 默认情况下,CrewAI 会连接 https://api.openai.com/v1
  • 如果你用的是其他服务(阿里百炼、DeepSeek 等),必须指定 base_url
  • 否则会报错:Connection refusedInvalid API key

4.2 OpenAI 官方

1
2
3
4
5
6
7
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
model="gpt-4o",
api_key="sk-xxxxx",
# base_url 默认是 https://api.openai.com/v1,可以不填
)

环境变量方式

1
OPENAI_API_KEY=sk-xxxxx

4.3 Anthropic Claude

1
2
3
4
5
6
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(
model="claude-sonnet-4-6",
api_key="sk-ant-xxxxx",
)

环境变量方式

1
ANTHROPIC_API_KEY=sk-ant-xxxxx

4.4 阿里百炼 Coding Plan(推荐)

阿里百炼 Coding Plan 提供两种兼容协议:

协议 Base URL 说明
OpenAI 兼容 https://coding.dashscope.aliyuncs.com/v1 使用 langchain-openai
Anthropic 兼容 https://coding.dashscope.aliyuncs.com/apps/anthropic 使用 langchain-anthropic

使用 OpenAI 兼容协议

1
2
3
4
5
6
7
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
model="qwen-max", # 通义千问模型
api_key="your_dashscope_key",
base_url="https://coding.dashscope.aliyuncs.com/v1", # 关键!
)

使用 Anthropic 兼容协议

1
2
3
4
5
6
7
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(
model="qwen-max",
api_key="your_dashscope_key",
base_url="https://coding.dashscope.aliyuncs.com/apps/anthropic", # 关键!
)

环境变量方式

1
2
DASHSCOPE_API_KEY=your_dashscope_key
DASHSCOPE_BASE_URL=https://coding.dashscope.aliyuncs.com/v1

4.5 DeepSeek

1
2
3
4
5
6
7
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
model="deepseek-chat",
api_key="your_deepseek_key",
base_url="https://api.deepseek.com/v1", # 关键!
)

4.6 智谱 AI (GLM)

1
2
3
4
5
6
7
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
model="glm-4",
api_key="your_zhipu_key",
base_url="https://open.bigmodel.cn/api/paas/v4/", # 关键!
)

4.7 本地模型 (Ollama)

1
2
3
4
5
6
7
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
model="llama3",
api_key="ollama", # 任意值
base_url="http://localhost:11434/v1", # Ollama 默认端口
)

4.8 统一配置方案

在实际项目中,推荐使用配置文件统一管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# config/llm_config.py

import os
from langchain_openai import ChatOpenAI

def get_llm(provider: str = "dashscope"):
"""统一获取 LLM"""

configs = {
"openai": {
"model": "gpt-4o",
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": None,
},
"dashscope": {
"model": "qwen-max",
"api_key": os.getenv("DASHSCOPE_API_KEY"),
"base_url": "https://coding.dashscope.aliyuncs.com/v1",
},
"deepseek": {
"model": "deepseek-chat",
"api_key": os.getenv("DEEPSEEK_API_KEY"),
"base_url": "https://api.deepseek.com/v1",
},
}

config = configs.get(provider)
if not config or not config["api_key"]:
raise ValueError(f"请配置 {provider} 的 API Key")

return ChatOpenAI(
model=config["model"],
api_key=config["api_key"],
base_url=config["base_url"],
)

使用

1
2
3
4
5
# 默认使用阿里百炼
llm = get_llm()

# 切换到 OpenAI
llm = get_llm("openai")

4.9 Agent 中使用 LLM

1
2
3
4
5
6
7
8
9
from crewai import Agent
from config.llm_config import get_llm

agent = Agent(
role="分析师",
goal="分析数据",
backstory="你是一位资深分析师",
llm=get_llm("dashscope"), # 指定 LLM
)

五、项目结构详解

5.1 使用 CLI 创建项目

1
2
crewai create crew my_crew
cd my_crew

5.2 目录结构说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
my_crew/
├── .env # 环境变量(API Key 等)
├── pyproject.toml # Python 项目配置
├── README.md
└── src/my_crew/
├── __init__.py
├── main.py # 入口文件,运行 crew
├── crew.py # 定义 Crew、Agent、Task
├── tools/ # 自定义工具
│ ├── __init__.py
│ └── custom_tool.py
└── config/
├── agents.yaml # Agent 配置(角色、目标、背景)
└── tasks.yaml # Task 配置(描述、预期输出)

5.3 agents.yaml 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# config/agents.yaml

researcher:
role: >
{topic} 高级研究员
goal: >
深入研究 {topic} 领域的最新发展
backstory: >
你是一位经验丰富的研究员,擅长发现最前沿的技术趋势。
你以能够找到最相关信息并以清晰简洁的方式呈现而闻名。

analyst:
role: >
{topic} 数据分析师
goal: >
分析研究数据并生成洞察报告
backstory: >
你是一位严谨的分析师,擅长将复杂数据转化为可执行的洞察。

占位符说明

  • {topic} 会在运行时被替换为实际值

5.4 tasks.yaml 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# config/tasks.yaml

research_task:
description: >
{topic} 进行全面研究,找出最新、最相关的信息。
expected_output: >
包含 10 个关键发现的要点列表
agent: researcher # 执行此任务的 Agent

analysis_task:
description: >
分析研究结果,生成详细的分析报告。
expected_output: >
一份完整的 Markdown 格式报告
agent: analyst
output_file: report.md # 输出文件

5.5 crew.py 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# crew.py

from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task

@CrewBase
class MyCrew():
"""我的 Crew"""

@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True,
)

@agent
def analyst(self) -> Agent:
return Agent(
config=self.agents_config['analyst'],
verbose=True,
)

@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)

@task
def analysis_task(self) -> Task:
return Task(
config=self.tasks_config['analysis_task'],
output_file='report.md',
)

@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
)

5.6 main.py 入口

1
2
3
4
5
6
7
8
9
10
11
12
13
# main.py

from my_crew.crew import MyCrew

def run():
"""运行 Crew"""
inputs = {
'topic': 'AI Agents'
}
MyCrew().crew().kickoff(inputs=inputs)

if __name__ == "__main__":
run()

5.7 运行项目

1
2
3
4
5
# 方式一:使用 CLI
crewai run

# 方式二:直接运行 Python
python src/my_crew/main.py

六、实战:构建一人公司 Agent 团队

6.1 场景描述

假设你要开发一个「人生规划器」App,但你只有一个人。你可以创建一个 Agent 团队来帮你:

  • CEO Agent:负责决策和协调
  • CTO Agent:负责技术架构
  • 产品经理 Agent:负责需求分析
  • 工程师 Agent:负责代码实现
  • 测试 Agent:负责质量保证

6.2 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
life-planner-crew/
├── .env
├── requirements.txt
├── main.py
├── agents/
│ ├── ceo_agent.py
│ ├── cto_agent.py
│ ├── product_manager.py
│ ├── backend_engineer.py
│ ├── frontend_engineer.py
│ └── qa_agent.py
├── tasks/
│ ├── requirement_tasks.py
│ ├── development_tasks.py
│ └── test_tasks.py
├── crews/
│ └── development_crew.py
├── tools/
│ └── file_tools.py
└── config/
├── project_config.py
└── llm_config.py

6.3 LLM 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# config/llm_config.py

import os
from langchain_openai import ChatOpenAI

DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")
DASHSCOPE_BASE_URL = "https://coding.dashscope.aliyuncs.com/v1"

def get_default_llm():
"""默认模型(用于决策类 Agent)"""
return ChatOpenAI(
model="qwen-max",
api_key=DASHSCOPE_API_KEY,
base_url=DASHSCOPE_BASE_URL,
)

def get_efficient_llm():
"""高效模型(用于执行类 Agent)"""
return ChatOpenAI(
model="qwen-turbo",
api_key=DASHSCOPE_API_KEY,
base_url=DASHSCOPE_BASE_URL,
)

6.4 Agent 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# agents/ceo_agent.py

from crewai import Agent
from config.llm_config import get_default_llm

def create_ceo_agent():
return Agent(
role="CEO - 公司首席执行官",
goal="""
负责:
1. 战略决策和目标设定
2. 团队协调和资源分配
3. 处理 Agent 团队分歧
4. 确保项目按时保质完成
""",
backstory="""
你是一位经验丰富的技术创业者。
决策果断但善于倾听,关注商业价值。
""",
verbose=True,
allow_delegation=True, # 可以委派任务
llm=get_default_llm(),
)

# agents/backend_engineer.py

def create_backend_engineer_agent():
return Agent(
role="后端工程师",
goal="实现 API 和数据库逻辑",
backstory="你专注于 Node.js 后端开发",
verbose=True,
allow_delegation=False,
llm=get_efficient_llm(), # 使用高效模型
)

6.5 Crew 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# crews/development_crew.py

from crewai import Crew, Process
from agents import (
create_ceo_agent,
create_cto_agent,
create_product_manager,
create_backend_engineer,
)

def create_development_crew():
# 创建 Agent
ceo = create_ceo_agent()
cto = create_cto_agent()
pm = create_product_manager()
backend = create_backend_engineer()

return Crew(
agents=[ceo, cto, pm, backend],
tasks=[], # 动态添加
process=Process.hierarchical, # 层级模式
manager_llm="qwen-max",
manager_agent=ceo, # CEO 作为管理者
verbose=True,
)

6.6 运行效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
python main.py

# 输出
╔═══════════════════════════════════════════════════════════════╗
║ 🎭 Life Planner Crew - 一人公司 Agent 团队 🎭 ║
╚═══════════════════════════════════════════════════════════════╝

📋 检查环境配置...
✓ DASHSCOPE_API_KEY 已配置
✓ 项目目录存在

🎯 请选择操作模式:
[1] 初始化项目
[2] 功能开发模式
[3] 决策讨论模式

七、常见问题与解决方案

Q1: 连接大模型失败

错误信息

1
ConnectionError: Failed to connect to api.openai.com

原因:没有配置正确的 base_url

解决

1
2
3
4
5
6
7
8
9
# 错误 ❌
llm = ChatOpenAI(model="qwen-max", api_key="xxx")

# 正确 ✅
llm = ChatOpenAI(
model="qwen-max",
api_key="xxx",
base_url="https://coding.dashscope.aliyuncs.com/v1",
)

Q2: API Key 格式错误

错误信息

1
AuthenticationError: Invalid API key

检查项

  1. API Key 是否正确复制(没有多余空格)
  2. 是否填对了环境变量名
  3. .env 文件是否被正确加载

解决

1
2
3
4
5
6
7
# 确保加载 .env
from dotenv import load_dotenv
load_dotenv()

# 验证 API Key
import os
print(os.getenv("DASHSCOPE_API_KEY"))

Q3: Agent 输出不符合预期

原因:角色定义不够清晰

解决:优化 backstory

1
2
3
4
5
6
7
8
9
10
11
# 不好 ❌
backstory="你是一个程序员"

# 更好 ✅
backstory="""
你是一位专注 Node.js 后端开发的工程师。
- 擅长 RESTful API 设计
- 遵循 Clean Code 原则
- 注重代码可维护性
- 使用 TypeScript 和 NestJS
"""

Q4: 依赖安装失败

Windows 上常见问题

1
2
3
4
# 安装 Visual C++ Build Tools
# 或使用预编译包
pip install tiktoken --prefer-binary
pip install 'crewai[embeddings]'

Q5: 如何调试 Agent

开启 verbose 模式

1
2
3
4
5
6
7
8
9
agent = Agent(
role="分析师",
verbose=True, # 输出详细过程
)

crew = Crew(
agents=[agent],
verbose=True, # 输出团队协作过程
)

八、总结

核心要点

  1. CrewAI 是什么:多 Agent 协作框架,让多个 AI 角色协同完成任务
  2. 核心概念:Agent(角色)、Task(任务)、Crew(团队)、Tools(工具)
  3. 关键配置:LLM 接入必须配置 base_url,这是大多数人踩坑的地方

大模型接入速查表

服务商 base_url 备注
OpenAI 默认 无需配置
阿里百炼 Coding Plan https://coding.dashscope.aliyuncs.com/v1 OpenAI 兼容
DeepSeek https://api.deepseek.com/v1 OpenAI 兼容
智谱 AI https://open.bigmodel.cn/api/paas/v4/ OpenAI 兼容
Ollama http://localhost:11434/v1 本地模型

学习资源

下一步

  1. 尝试创建你的第一个 Crew
  2. 接入你喜欢的大模型服务
  3. 设计适合你场景的 Agent 角色
  4. 组合 Crews 和 Flows 构建复杂工作流

作者:AI 技术探索者
日期:2026-03-31

本文基于 CrewAI v1.12+ 和阿里百炼 Coding Plan 服务编写

写在前面

如果你刚读完《AI 概念指南》,对 LLM、Agent 这些基础概念有了初步了解,但在实际使用 AI 时,可能又遇到了新的困惑:

  • “Claude Code 和 Claude 是什么关系?用 Claude Code 必须用 Claude 吗?”
  • “Claude Code 里可以切换模型,那它和 Kimi、GLM 是什么关系?”
  • “我想用国产模型,该选工具还是选模型?”

这些问题非常普遍。AI 领域有两层东西叠在一起:一层是”大脑”(模型),一层是”应用”(工具)。很多小白搞混,就是因为分不清哪层是哪层。

这篇文章的目标:让你彻底搞清楚模型和工具的关系,看完之后不再迷糊


一、核心概念:两层架构

把 AI 世界想象成一个餐厅:

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────────────┐
│ 餐厅(AI 生态) │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 厨师(模型) │◄──────│ 服务员(工具) │ │
│ │ │ │ │ │
│ │ 负责做菜 │ │ 负责端菜、接待 │ │
│ │ 提供"智能能力" │ │ 提供"交互界面" │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘

第一层:AI 模型(厨师)—— 提供”智能”

模型是 AI 的核心,负责思考、理解、生成内容。它就像餐厅里的厨师,决定菜品的”味道”(智能水平)。

模型的核心特点:

  • 不直接面向用户,而是通过 API 被”工具”调用
  • 是智能的”发动机”,所有的推理、理解、生成都在这里完成
  • 一次训练成本极高(数亿到数十亿美元),但边际使用成本低

第二层:AI 工具(服务员)—— 提供”交互”

工具是用户实际接触的产品,负责界面交互、功能整合。它就像餐厅里的服务员,把厨师做的菜端到顾客面前。

工具的核心特点:

  • 直接面向用户,提供聊天界面、编程环境、文档处理等功能
  • 调用一个或多个模型的 API,把模型能力”包装”成用户友好的产品
  • 本身不产生智能,只是”智能的搬运工”

二、主流模型一览(第一层)

先认识一下市面上的”厨师们”:

国内模型

模型 出品公司 特点 适用场景
Kimi 月之暗面 超长上下文(200万字),中文优化 长文档阅读、论文分析
GLM 智谱AI 开源+闭源双轨,学术背景强 技术研究、企业部署
Qwen(通义千问) 阿里巴巴 中文能力强,开源生态好 中文场景、企业应用
DeepSeek 深度求索 开源、性价比极高、数学推理强 代码、数学、技术研究
Doubao(豆包) 字节跳动 中文理解好,性价比高,多场景适配 日常对话、办公助手
MiniMax MiniMax 多模态、语音交互强 语音场景、多模态应用

国外模型

模型 出品公司 特点 适用场景
Claude Anthropic 推理严谨、代码能力强、上下文长 编程、长文档分析、安全场景
GPT(ChatGPT 底层) OpenAI 生态最完善、插件最多 通用场景、创意写作
Gemini Google 多模态原生、谷歌生态集成 多媒体处理、谷歌用户
Llama Meta 完全开源、可本地部署 私有部署、技术研究

一个关键认知

模型本身不是产品。你无法”直接使用”模型,必须通过某个工具(如 ChatGPT、Claude 网页版、API 接口等)来访问模型的能力。

1
2
3
4
5
6
7
8
┌─────────────────────────────────────────────────────────┐
│ 模型不能直接访问 │
│ │
│ ❌ "我要用 GPT-4" —— 这是无法直接做到的 │
│ ✅ "我要用 ChatGPT" —— 这才是正确说法 │
│ │
│ 因为 ChatGPT 是工具,GPT-4 是它背后的模型 │
└─────────────────────────────────────────────────────────┘

三、主流工具一览(第二层)

再认识一下”服务员们”:

AI 对话工具(网页版/APP)

这些工具提供对话界面,让你能和模型”聊天”:

工具 主要模型 是否支持切换模型
ChatGPT GPT 系列 ❌ 仅 OpenAI 模型
Claude 网页版 Claude 系列 ❌ 仅 Anthropic 模型
Kimi 网页版/APP Kimi ❌ 仅月之暗面模型
豆包 Doubao ❌ 仅字节跳动模型
Poe 多模型 ✅ 支持 Claude、GPT、Llama 等
Cherry Studio 多模型 ✅ 支持配置多种模型 API
Chatbox 多模型 ✅ 支持配置多种模型 API

AI 编程工具(IDE/编辑器插件)

这些工具专注于编程场景,能读写代码、理解项目:

工具 类型 支持的模型 特点
Claude Code CLI/IDE插件 Claude、Kimi、GLM、MiniMax、Doubao 等多模型 官方出品,代码理解最强,支持多模型切换
Cursor AI IDE Claude、GPT、DeepSeek、Gemini 等 可切换模型,IDE 集成体验好
GitHub Copilot IDE 插件 GPT 系列 微软出品,与 GitHub 深度集成
Trae AI IDE Kimi、GLM、Doubao 等(国内模型为主) 字节出品,中文优化
Qoder AI 编程工具 多模型 支持多种国产模型
OpenCode 开源 AI 编程工具 多模型 开源免费,可自定义配置

重要说明:Claude Code 是 Anthropic 官方出品的编程工具,但它支持多种模型,包括国内的 Kimi、GLM、MiniMax、Doubao 等。对于没有海外信用卡的中国用户,可以配置国产模型 API 来使用 Claude Code。

工具的本质

工具做的事情就是把你的输入(问题、代码、文档)发送给模型,然后把模型的回答展示给你:

1
2
3
4
5
6
7
8
9
┌─────────────────────────────────────────────────────────────────┐
│ 工具的工作流程 │
│ │
│ 用户输入 ──► 工具处理 ──► 调用模型 API ──► 获取模型回答 ──► 展示给用户
│ │ │ │
│ "帮我写个函数" 发送请求 返回代码
│ 到模型服务器 给用户
│ │
└─────────────────────────────────────────────────────────────────┘

四、模型与工具的关系图

关系示意图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
                              工具层(用户直接接触)
┌─────────────────────────────────────────────────────────────────────┐
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Claude Code │ │ Cursor │ │ Trae │ │
│ │ (多模型) │ │ (多模型) │ │ (国内模型) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 多模型 API │ │ 多模型 API │ │ Kimi/GLM API │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────┬───────────────────────────────┘


模型层(智能来源)
┌─────────────────────────────────────────────────────────────────────┐
│ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Claude │ │ GPT │ │ Gemini │ │ Kimi │ │ DeepSeek│ │
│ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ GLM │ │ Doubao │ │MiniMax │ │ Qwen │ │ ... │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘

两种关系模式

模式一:官方绑定型

1
2
3
4
5
6
7
8
9
10
11
┌─────────────┐           ┌─────────────┐
│ ChatGPT │ ────────► │ GPT │
│ (工具) │ 官方绑定 │ (模型) │
└─────────────┘ └─────────────┘

┌─────────────┐ ┌─────────────┐
│ Claude 网页版│ ────────► │ Claude │
│ (工具) │ 官方绑定 │ (模型) │
└─────────────┘ └─────────────┘

特点:一个工具只能用自家模型的 API,不能切换其他模型

模式二:多模型支持型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌─────────────┐           ┌─────────────┐
│ Claude Code │ ────────► │ Claude │
│ (工具) │ │ (模型) │
│ │ ────────► ├─────────────┤
│ │ │ Kimi │
│ │ ────────► ├─────────────┤
│ │ │ GLM │
│ │ ────────► ├─────────────┤
│ │ │ MiniMax │
│ │ ────────► ├─────────────┤
│ │ │ Doubao │
└─────────────┘ └─────────────┘

特点:一个工具可以切换多个模型,用户自己选择

注意:Claude Code 虽然是 Anthropic 官方出品,但它支持配置多种模型的 API。对于中国用户来说,这意味着你可以用 Kimi、GLM、MiniMax、Doubao 等国产模型来驱动 Claude Code,而不必依赖海外支付渠道。


五、常见困惑解答

Q1:Claude Code 就是 Claude 吗?

不是。

  • Claude 是模型,是 Anthropic 开发的 AI “大脑”
  • Claude Code 是工具,是 Anthropic 官方出品的编程助手

Claude Code 调用模型来帮你写代码,但它不只是调用 Claude,还支持 Kimi、GLM、MiniMax、Doubao 等多种模型。你可以理解为:

1
Claude Code = 编程专用界面 + 代码操作能力 + 多模型 API 支持

Q2:为什么 Claude Code 可以选择 Claude、Kimi、GLM、MiniMax?

因为 Claude Code 是”多模型工具”,它支持配置多个模型厂商的 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌─────────────────────────────────────────────────────────┐
│ Claude Code 架构 │
│ │
│ 用户界面 ◄──► Claude Code 核心 ◄──► 模型配置 │
│ │ │
│ ┌──────────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ Claude API Kimi API GLM API │
│ (Anthropic) (月之暗面) (智谱AI) │
│ │
│ ▼ ▼ ▼ │
│ MiniMax API Doubao API 其他 API │
│ (MiniMax) (字节跳动) (可扩展) │
└─────────────────────────────────────────────────────────┘

Claude Code 做的是”整合商”的工作,让你在一个工具里能用多个模型。

对于中国用户特别友好:如果你没有海外信用卡,无法购买 Claude API,完全可以配置国产模型(如 Kimi、GLM、MiniMax、Doubao)的 API 来使用 Claude Code。这些国产模型通常支持国内支付方式,获取 API 更方便。

Q3:我想用国产模型,该怎么选?

方案一:直接用模型官方工具

  • Kimi → Kimi 网页版/APP
  • GLM → 智谱清言
  • Doubao → 豆包 APP
  • Qwen → 通义千问
  • DeepSeek → DeepSeek 网页版

方案二:用支持国产模型的编程工具

这是很多程序员的选择,既能用国产模型,又能享受专业编程工具的体验:

  • Claude Code(推荐)—— 支持 Kimi、GLM、MiniMax、Doubao 等多种国产模型,代码能力最强
  • Trae(字节出品)—— 默认 Kimi/GLM/Doubao,中文优化
  • Qoder —— 支持多种国产模型
  • Cursor —— 支持 DeepSeek

Q4:模型和工具哪个更重要?

都重要,但影响点不同:

维度 模型决定 工具决定
智能水平 ✓ 回答质量、推理能力
使用体验 ✓ 界面设计、操作流畅度
功能丰富度 ✓ 代码高亮、文件管理、历史记录
模型选择 ✓ 是否支持切换模型
价格 ✓ API 调用成本 ✓ 订阅费用
隐私合规 ✓ 数据处理方式 ✓ 本地存储、日志记录

一句话总结:模型决定”上限”,工具决定”体验”。


六、选型指南

按需求选

你的需求 推荐模型 推荐工具
日常聊天问答 GPT-4o、Claude、Doubao ChatGPT、Claude 网页版、豆包
读长文档/论文 Kimi、Claude Kimi 网页版、Claude
写代码/重构 Claude、DeepSeek、GLM Claude Code、Cursor
数学/逻辑推理 DeepSeek-R1、Claude DeepSeek 网页版、Claude Code
国产模型+编程 Kimi、GLM、Doubao、MiniMax Claude Code(配置国产模型 API)
企业私有部署 GLM、Qwen、DeepSeek OpenCode、自建平台

按用户类型选

用户类型 推荐组合 理由
完全小白 ChatGPT 或 Kimi/豆包 网页版 开箱即用,无需配置
办公族 Kimi/豆包 + Claude 网页版 长文档 + 通用问答
程序员(有海外支付) Claude Code + Claude API 官方模型,体验最佳
程序员(国内用户) Claude Code + Kimi/GLM/Doubao API 国产模型,支付方便
技术极客 Cherry Studio + 多模型 API 灵活配置,自由切换
企业用户 私有部署 + 定制工具 数据安全,合规可控

七、避坑指南

常见误区

误区 正确理解
“我买了 Cursor,是不是就能用 Claude?” Cursor 是工具,Claude API 需要单独付费或配置
“Claude Code 只能用 Claude 吗?” 错误! Claude Code 支持多种模型,包括 Kimi、GLM、MiniMax、Doubao 等
“Kimi 和 DeepSeek 哪个好?” 没有绝对好坏,看场景:Kimi 长文档强,DeepSeek 代码数学强
“Claude Code 是官方出品,肯定只支持自家模型” 官方出品不代表只支持自家模型,Claude Code 是开放的多模型工具
“国产模型不如国外模型” 各有优势,国产模型中文理解更好,国外模型生态更完善

省钱技巧

  1. 简单任务用小模型:日常对话用更便宜的模型(如 GPT-4o-mini、Claude Haiku)
  2. 复杂任务用大模型:代码、推理用更强的模型(如 Claude Sonnet、DeepSeek-V3)
  3. 对比后再订阅:先用免费额度测试,再决定是否付费
  4. 注意 API 调用成本:工具可能免费,但模型 API 调用要收费
  5. 国产模型性价比高:Kimi、GLM、Doubao 等国产模型通常价格更友好,支付更方便

八、总结

用一个比喻来总结模型和工具的关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌─────────────────────────────────────────────────────────────────┐
│ │
│ 模型 = 发动机(提供动力) │
│ 工具 = 汽车(提供驾驶体验) │
│ │
│ Claude Code = 一辆可以换不同发动机的改装车 │
│ 可以装 Claude、Kimi、GLM、MiniMax、Doubao 等 │
│ │
│ ChatGPT = 装了 GPT 发动机的官方轿车(不能换发动机) │
│ Claude 网页版 = 装了 Claude 发动机的官方轿车(不能换发动机) │
│ │
│ 你可以: │
│ - 买不能换发动机的官方车(ChatGPT、Claude 网页版) │
│ - 买可换发动机的车(Claude Code、Cursor、Poe) │
│ - 自己组装(Cherry Studio + API) │
│ │
│ 核心原则:先想好你要什么体验,再选车和发动机 │
│ │
└─────────────────────────────────────────────────────────────────┘

记住三个要点

  1. 模型是智能来源,工具是交互入口 —— 搞清楚你在用哪层
  2. 一个工具可能支持多个模型,一个模型可以被多个工具调用 —— 它们是多对多的关系
  3. Claude Code 支持多模型 —— 对于中国用户,可以配置国产模型 API 来使用

附录:术语对照表

模型术语 所属公司 常见混淆
Claude Anthropic Claude Code 是工具,Claude 是模型;但 Claude Code 支持多模型
GPT-4、GPT-4o OpenAI ChatGPT 是工具,GPT 是模型
Kimi 月之暗面 同名,但要注意区分工具和模型
DeepSeek 深度求索 同名,网页版是工具,API 是模型
GLM 智谱AI 智谱清言是工具,GLM 是模型
Doubao(豆包) 字节跳动 豆包 APP 是工具,Doubao 是模型
Qwen 阿里巴巴 通义千问是工具,Qwen 是模型
Gemini Google Gemini 网页版是工具,Gemini 是模型

参考资料

写在前面

说实话,2024 到 2025 这一年,AI 领域的名词简直像爆米花一样噼里啪啦往外蹦。ChatGPT、Claude、DeepSeek、Agent、MCP、Vibe Coding……身边的朋友经常一脸懵地问我:”这些到底都是啥?我该学哪个?”

这篇文章的目标很简单:帮你建立一套清晰的认知坐标系。读完之后,你不会再被各种新名词唬住,能准确判断某个新概念在整个版图里的位置,也知道该用什么工具解决自己的问题。


一、三个核心概念:LLM、Agent、Skills

理解 AI 生态,其实就抓三个东西:LLM(大脑)、Agent(行动者)、Skills(工具箱)。这三者的关系搞明白了,其他的都是在这个基础上的变体。

1. LLM——AI 的大脑,但它不是魔法

一句话理解:LLM(大语言模型)就是 AI 的”大脑”,它读了互联网上能读到的海量文本,学会了人类的语言模式和知识,能听懂你说的话,也能生成像人写的文字。

不过这里有几个坑,不得不说很多新手都会踩:

  • LLM 不是搜索引擎。搜索引擎是去库里找现成的答案,LLM 是”脑补”出最可能的回答。这意味着它可能会一本正经地胡说八道——也就是所谓的”幻觉”。
  • LLM 不联网(除非你特别配置)。它的知识有个截止日期,比如 GPT-4 的知识就到 2024 年初,问它之后的事,它是真不知道。
  • LLM 并不”理解”。它本质上是概率预测——根据前面的词,猜下一个最可能出现的词是什么。它不像人一样有真正的理解,只是模式匹配玩得溜。

模型 vs 产品的区分

这里有个很多人混淆的概念:模型产品是两回事。

  • 模型(如 GPT-4、Claude 3.5)是底层的”大脑”,提供智能能力
  • 产品(如 ChatGPT、Claude 网页版)是包装好的应用,包含界面、功能和安全限制

同一个模型可以驱动多个产品。比如 GPT-4 既驱动 ChatGPT,也驱动 Microsoft Copilot。所以别问”GPT-4 和 ChatGPT 哪个好”,这就像是问”发动机和小轿车哪个好”——根本不是一个层面的东西。

主流 LLM 怎么选

如果你现在就想试试,这几款是市面上的主力:

模型 出品方 特点 适合场景
GPT-4/GPT-4o OpenAI 通用能力强,生态完善 创意写作、通用问答
Claude 3.5/4 Anthropic 上下文长(20万token),推理严谨 长文档分析、代码审查
DeepSeek-V3/R1 深度求索 开源、性价比高、数学强 技术研究、预算敏感场景
Gemini 1.5/2.0 Google 多模态原生,谷歌生态 多媒体处理
Qwen2.5 阿里巴巴 中文优化,开源友好 中文场景

我的建议:普通用户从 ChatGPT 或 Claude 开始,想省钱或玩技术的试试 DeepSeek,中文场景多的选 Kimi 或 Qwen。

2. Agent——不只会聊天,还能动手干活

如果你只用过 ChatGPT 网页版,那你接触的还是”纯对话”模式。但 Agent 不一样,它是能自主规划、使用工具、完成任务的 AI 系统。

打个比方

  • LLM 是”会聊天的百科全书”——你问它答,但它不动手
  • Agent 是”能动手解决问题的智能助手”——你说”帮我订一张明天去上海的机票”,它能自己去查航班、比价、填信息、完成预订

Agent 和纯对话 LLM 的核心区别:

能力 纯对话 LLM Agent 系统
交互方式 一问一答 自主多轮执行
工具使用 依赖产品是否开放 原生设计为调用外部工具
任务完成 给建议、生成内容 实际执行操作并交付结果
记忆能力 单轮或有限轮对话 长期记忆、状态跟踪、跨会话

Agent 是怎么干活的

Agent 的工作流程通常叫 ReAct 范式(Reasoning + Acting):

1
2
3
观察环境 → 理解目标 → 推理思考 → 选择工具 → 执行操作 → 观察结果 → 迭代直到完成
▲ │
└──────────────────────────────────────────────────────────────────┘

它会循环执行这个过程,直到任务完成或达到终止条件。比如你要 Agent”分析一下这份财报并生成图表”,它可能会:

  1. 读取文件(调用文件操作 Skill)
  2. 分析数据(调用代码执行 Skill 跑 Python)
  3. 生成图表(调用可视化工具)
  4. 检查输出,如果不对就调整重试

Agent 产品有哪些

产品 类型 核心能力
Claude Code 编程 Agent 直接操作代码库,批量重构、调试
Manus 通用 Agent 端到端任务自动化,可操作用户界面
AutoGPT 实验性 Agent 自主分解任务、循环执行

注意:Cursor 和 GitHub Copilot 很多人以为是 Agent,其实它们更像是”增强型编辑器”——主要帮你写代码,而不是自主执行任务。真正的 Agent 像 Claude Code,能独立规划和执行多步骤任务。

3. Skills——Agent 的手和脚

Skills 是 Agent 能调用的具体能力,没有 Skills,Agent 就是”光说不练”。

常见的 Skills 包括:

  • 📁 文件操作(读/写/搜索本地文件)
  • 🔍 网络搜索(获取实时信息)
  • 💻 代码执行(运行 Python、Bash 等)
  • 🗄️ 数据库查询(SQL 操作)
  • 📧 消息发送(邮件、IM 通知)

二、市面上的 AI 产品怎么分类

现在你已经搞懂了 LLM、Agent、Skills 三层架构,接下来看看市面上的产品都落在哪个象限。

1. 对话产品(包装好的 LLM)

这些产品本质上是”给 LLM 套了个壳”,主要提供对话界面:

产品 底层模型 核心特点
ChatGPT GPT-4o/o3 最知名,生态丰富,插件多
Claude Claude 3.5/4 上下文长(20万token),推理严谨
Kimi Moonshot 自研 超长上下文(200万字),适合读长文档
DeepSeek DeepSeek-V3/R1 开源透明,数学推理强,价格低

上下文的坑:很多人不知道”上下文长度”是什么意思。简单说,就是 AI 能”记住”多少内容。Kimi 的 200 万字 ≈ 几十本书,Claude 的 20 万 token ≈ 30 万汉字。如果你要读一篇很长的论文或合同,必须选支持大上下文的模型,否则它读到后面就忘了前面说什么。

2. AI 编程助手(程序员的武器库)

产品 定位 适合场景
Cursor AI IDE 日常开发,代码生成、解释、重构
GitHub Copilot 代码补全 实时代码建议,提升编码效率
Claude Code 编程 Agent 复杂任务,批量重构,多文件操作

选型建议:日常写代码用 Cursor 或 Copilot 就够了;要做大规模代码重构、理解整个项目架构,Claude Code 更猛。

3. 内容生成工具(特定领域的 AI)

产品 类型 特点
Midjourney 图像生成 艺术风格最强,需 Discord 使用
DALL·E 3 图像生成 与 ChatGPT 集成,使用方便
Sora 视频生成 OpenAI 出品,目前仅对特定用户开放
Nano Banana 图文处理 Gemini 生态,支持图文混合任务

注意:Nano Banana 是 Google 实验性工具,功能可能迭代更新。


三、MCP 和 Vibe Coding:两个值得关注的趋势

1. MCP——AI 的 USB-C 接口

MCP(Model Context Protocol)是 Anthropic 在 2024 年 11 月推出的开放协议,旨在标准化 AI 与外部数据源的连接方式。

一句话理解:MCP 是 AI 世界的”USB-C 标准”。以前每个 AI 连数据库都要单独开发适配器,现在只要实现一次 MCP,所有支持 MCP 的 AI 都能用。

MCP 的三层架构:

  • MCP 主机 (Host):运行 AI 的程序(如 Claude Code)
  • MCP 客户端 (Client):维持与服务器连接的组件
  • MCP 服务器 (Server):提供具体能力的服务(如文件系统、GitHub、数据库)
1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────────────────────┐
│ MCP 架构示意 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────┐ ┌───────────┐ │
│ │ MCP Host │──────► MCP Client │────► MCP Server │ │
│ │ (Claude Code)│ │(连接管理)│ │ (文件系统) │ │
│ └─────────────┘ └──────────┘ └───────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ [LLM] 标准化协议 具体工具 │
└─────────────────────────────────────────────────────────┘

为什么说它重要:MCP 让 AI 能方便地”伸手”到你的文件、数据库、GitHub、Slack 等各种数据源,是 Agent 能力的核心基础设施。目前已有文件系统、GitHub、PostgreSQL 等官方 MCP Server。

2. Vibe Coding——自然语言编程的新范式

Vibe Coding 是由前特斯拉 AI 总监 Andrej Karpathy 在 2025 年初提出的概念,指用自然语言描述需求,由 AI 自动生成代码的开发方式。

核心理念的转变

  • 从”写代码”转向”描述需求”
  • 从”语法细节”转向”意图表达”
  • 从”手工实现”转向”审查和迭代”

实际场景:你说”帮我做一个待办事项网页,可以添加任务、标记完成、删除,要简洁美观”,AI 直接生成完整的 HTML/CSS/JavaScript 代码,你只需要审查和微调。

但不得不说,它也有局限

  • 适合原型开发、简单应用、脚本任务
  • 复杂系统架构仍需专业开发者把控
  • 对”描述清楚需求”的能力要求反而更高——说不明白,AI 也猜不透

四、概念关系全景图

把上面的内容串起来,整个 AI 生态的层次是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
                    ┌─────────────────────────────────────┐
│ AI 生态全景 │
└─────────────────────────────────────┘

┌─────────────────────────┼─────────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ LLM │ │ Agent │ │ Skills │
│ (大脑) │◄────────►│ (行动者) │◄────────►│ (工具) │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ MCP 协议层 │ │
│ │ (标准化连接) │ │
│ └────────┬─────────┘ │
│ │ │
┌───────┴───────┐ ┌────────┴────────┐ ┌────────┴────────┐
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│GPT-4 │ │Claude│ │Claude│ │OpenCl│ │文件系│ │搜索 │
│系列 │ │系列 │ │ Code │ │aw │ │统 │ │引擎 │
└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ChatGPT │ │Claude │ │Cursor │ │本地任务│ │代码执行│ │联网搜索│
│网页版 │ │网页版 │ │IDE插件 │ │自动化 │ │文件读写│ │API调用 │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘

记住这个核心逻辑:LLM 提供智能基础 → Agent 赋予行动能力 → Skills 提供工具支持 → MCP 标准化连接方式。


五、选型指南——我该用什么?

按使用场景选

你的需求 推荐工具 原因
日常问答、灵感收集 ChatGPT / Claude 通用性强,易用
读论文/长文档/法律文件 Kimi / Claude Kimi 200万字上下文,Claude 20万token
写代码、审查代码 Claude Code / Cursor 代码理解和操作能力强
画插画、设计图 Midjourney / DALL·E 3 Midjourney 艺术风格佳,DALL·E 集成方便
做视频内容 Runway / Pika / Sora(开放后) Sora 暂未全面开放
处理敏感数据 本地模型 + OpenClaw 数据不出本机,隐私可控
数学/逻辑推理 DeepSeek-R1 / Claude / o3 DeepSeek-R1 开源且推理强
自动化办公任务 Manus / AutoGPT 端到端任务自动化

按技术接受度选

用户类型 推荐起点 进阶方向
完全小白 ChatGPT 网页版 Claude、Kimi
办公族 Kimi 读文档 + ChatGPT 写作 学习 Prompt 工程
创作者 Midjourney + Claude 学习 AI 工作流
程序员 Cursor / Claude Code MCP + Agent 开发
极客玩家 本地模型 + OpenClaw 自建 AI 工作流

六、避坑指南——这些误区你得知道

常见误区

误区 真相
AI 什么都知道 LLM 有知识截止日期,不联网时无法获取新信息
AI 不会犯错 会产生”幻觉”,一本正经地生成错误信息
AI 有真正的理解 基于概率的模式匹配,不是真正的”理解”
越贵的模型越好 简单任务用小模型更快更便宜(如 GPT-4o-mini)
AI 要取代人类了 当前是”增强人类”阶段,替代的是重复性工作
提示词越复杂越好 关键是清晰表达需求,不是堆砌辞藻

使用建议

  1. 交叉验证:重要信息用多个 AI 或搜索引擎验证,别盲信单一来源
  2. 提供上下文:给 AI 足够的背景信息,结果更准确(Garbage In, Garbage Out)
  3. 迭代优化:第一次不满意?继续追问、细化需求
  4. 了解边界:创意、整理、初稿适合 AI;关键决策、医疗法律建议不适合
  5. 保存记录:AI 输出可能变化,重要内容要保存

附录:术语速查表

术语 定义
Token LLM 处理文本的基本单位,1 token ≈ 0.75 个英文单词
上下文长度 AI 能”记住”的文本范围
微调 (Fine-tuning) 在基础模型上用特定数据进一步训练
RAG 检索增强生成,让 AI 基于知识库回答问题
Prompt 给 AI 的输入指令
多模态 能同时处理文本、图像、音频、视频
幻觉 AI 生成看似合理但实际错误的内容
推理模型 专门优化推理的模型(如 o1、o3、DeepSeek-R1)
Function Calling LLM 调用外部函数/API 的能力

总结

说了这么多,核心观点就三个:

  1. 没有”最好”的 AI,只有”最适合”的 AI。选工具要看场景,不是越贵越好。
  2. 模型是大脑,产品是包装,Agent 是系统。搞清楚这三层,新名词来了你也能归类。
  3. AI 是增强人类的工具,不是替代人类的魔法。它会犯错,有边界,需要人的判断。

如果你看完还是不知道从何下手,不妨这样:先花 30 分钟注册个 ChatGPT 或 Kimi 账号,实际提几个问题,比看十篇文章都有用。用着用着,你就知道自己需要什么了。


参考资料


Oracle SQL*LoaderOracle 数据库管理系统中一个强大的工具,用于高效地将大量数据从外部文件加载到 Oracle 数据库中。它提供了一种快速、灵活的方式来导入数据,适用于各种数据格式和文件类型。本文将介绍 SQL*Loader 的基本概念、工作原理以及实际应用场景。

一、什么是 SQL*Loader?

SQL*Loader 是一个用于导入数据的实用程序,允许用户将普通文件、CSV 文件等外部数据源中的数据加载到 Oracle 数据库表中。它是 Oracle 数据库中的一个标准工具,可以轻松地处理大规模的数据加载任务。

二、SQL*Loader 的工作原理

SQL*Loader 的工作原理比较简单:

  • 控制文件定义:编写一个控制文件,其中指定了要加载的目标表、字段映射关系、数据格式等信息。控制文件是 SQL*Loader 的核心配置文件之一;

  • 准备外部数据文件:用户需要准备一个包含待加载数据的外部文件,可以是纯文本文件、CSV 文件等格式;

  • 运行 SQL*Loader:通过命令行或者其他界面工具运行 SQL*Loader,并指定控制文件和数据文件的位置。SQL*Loader 将根据控制文件加载到目标表中;

  • 数据加载SQL*Loader 按照控制文件中指定的规则,逐行解析外部数据文件,并将数据插入到目标表中;

三、SQL*Loader 控制文件

这是一个控制文件模板样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
LOAD DATA
INFILE 'data.csv' -- 指定外部数据文件的路径
INTO TABLE employees -- 指定目标表名
CHARACTERSET UTF8 -- 指定外部数据文件的字符集编码为 UTF-8
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' -- 指定字段分隔符和可选的字段包裹符
(
employee_id, -- 目标表的列名
first_name,
last_name,
email,
hire_date DATE 'YYYY-MM-DD' -- 数据格式化,确保日期格式正确
)
WHEN (hire_date >= '2022-02-01')
  • LOAD DATA 表示开始加载数据的声明;
  • INFILE 'data.csv' 指定了外部数据文件的路径。你需要将 data.csv 替换为实际的外部数据文件名,并确保文件路径正确;
  • INTO TABLE employees 指定了目标表名为 employees,即将数据加载到 employees 表中。你需要将 employees 替换为实际的目标表名;
  • CHARACTERSET UTF8 指定外部数据文件的字符集编码为 UTF-8
  • FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' 定义了字段的分隔符和可选的字段包裹符。在这个示例中,字段被逗号分隔,并且可能被双引号包裹。根据实际情况,你可能需要调整这些选项;
  • (employee_id, first_name, last_name, email, hire_date DATE 'YYYY-MM-DD') 定义了要加载的字段和它们的数据类型。确保与目标表的列名和数据类型匹配。在这个示例中,hire_date 被格式化为日期,并指定了日期的格式;
  • WHEN (hire_date >= '2022-01-01') 定义了加载数据的条件,只有满足条件的数据,才会被导入 Oracle 数据库;

四、SQL*Loader 的应用场景

SQL*Loader 在实际应用中有广泛的用途,例如:

  • 数据迁移和导入:当需要将外部数据源中的数据导入到 Oracle 数据库中时,SQL*Loader 是一个好的选择。它可以通过灵活的配置,处理大量的数据;
  • 数据集成和同步:在数据集成和同步的场景中,SQL*Loader 可以用于将不同系统或者数据源中的数据整合到同一数据库中,以便进行分析、报告等操作;
  • 日常数据加载:从外部系统中获取数据,并将其加载到 Oracle 数据库中进行进一步处理。SQL*Loader 可以自动化这一过程,提高数据处理的效率;

五、实际应用

以下是一个简单的示例,演示如何使用 SQL*LoaderCSV 文件加载到 Oracle 表中:

  • 创建一个控制文件 data.ctl,定义目标表和字段映射关系:
1
2
3
4
5
6
7
8
9
10
11
sqlCopy codeLOAD DATA
INFILE 'data.csv'
INTO TABLE employees
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
( employee_id,
first_name,
last_name,
email,
hire_date DATE 'YYYY-MM-DD'
)
WHEN (hire_date >= '2022-02-01')
  • 准备外部数据文件 data.csv,包含待加载的数据。
employee_id first_name last_name email hire_date
001 zhangsan@gmail.com 2023-02-01
002 lisi@gmail.com 2023-02-02
  • 在命令行中运行 SQL*Loader
1
2
bashCopy code
sqlldr username/password@database control=data.ctl

这样,SQL*Loader 就会将 data.csv 中的数据加载到名为 employees 的表中。

六、结论

Oracle SQL*Loader 是一个强大的数据加载工具,可用于高效地将外部数据加载到 Oracle 数据库中。通过简单的配置和命令,用户可以轻松地处理大量的数据加载任务,提高数据处理的效率和可靠性。


一、跨库分页查询

对于一定数量级的表,如订单表,通常采用分库分表的方式保存数据。根据指定字段,如果用户ID,散列数据到不同的库中。那么,如果需要按照下单时间分页查询订单信息,就涉及到跨库查询。

假设有45笔订单,存于三个库中,散列算法是 OrderID % 3,则数据分布为:

如果以每页五个订单,查询第二页的所有订单,则单库查询 sql 为:

1
select * from order_info order by id limit 5 offset 5;

但跨库查询就行不通了。下面,主要有三种方案可以用于跨库分页查询:

  • 全局查询法
  • 禁止跳页查询法
  • 二次查询法

二、全局查询法

全局查询法,需要在每个分库中执行查询语句,然后再程序中排序,再定位切割到指定的数据段。

如果需要查询第二页订单,需要查询每个库的前二页数据:

1
2
3
select * from order_info_1 order by id limit 10;
select * from order_info_2 order by id limit 10;
select * from order_info_3 order by id limit 10;

结果为:

将以上三个库的查询结果排序:

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30

那么第二页的订单列表为:

6,7,8,9,10

小结:对于低页码查询,全局查询法是可以应付的,但是,当页码越来越大,查出来的数据也就越来越多,需要排序的数据也越来越多,查询效率也就会越来越慢。

三、禁止跳页查询法

全局查询法的一个显著缺陷,就是随着页码越来越大,查询的数据量也越来越大。那么,如果禁止跳页查询,且每次查询都以上次查询的最大ID为基点,就可以保证每次查询的数据量都是相同的。

查询第一页数据:

将以上三个库的查询结果排序:

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15

那么第一页的订单列表为:

1,2,3,4,5

查询第二页数据:

第一页的订单ID最大值为5,因此第二页的订单ID起始值应大于5,查询得到:

将以上三个库的查询结果排序:

6,7,8,9,10,11,12,13,14,15,16,17,18,19,20

那么第二页的订单列表为:

6,7,8,9,10

小结:禁止跳页查询法,保证每次查询的数据量是相同的,避免了分页查询带来的性能衰减问题;但禁止跳页也是功能缺陷,没法一步定位到指定数据段。

四、二次查询法

二次查找法,既满足跳页查询,也能避免分页查询性能衰减。为了解释这一思想,我们以查询第三页订单数据为例。单库查询语句:

1
select * from order_info order by id limit 5 offset 10;

之所以叫二次查询法,当然需要查询两次。这两次查询有什么不同,希望通过以下四个步骤说清楚:

第一步:语句改写

select * from order_info order by id limit 5 offset 10 改写成 select * from order_info order by id limit 5 offset 3。偏移量10变成3,是基于10/3计算得出的。将语句在三个库分别执行,得到数据:

第二步:找最小值

  • 第一个库:最小数据为8
  • 第二个库:最小数据为11
  • 第三个库:最小数据为12

因此,从三个库中拿到的最小数据为8。

第三步:第二次语句改写

这次需要把 select * from order_info order by id limit 5 offset 3 改写成一个between语句,起点是最小的OrderID,终点是原来每个分库各自返回数据的最大值:

  • 第一个分库改写为: select * from order_info order by id where id between id_min and 22
  • 第二个分库改写为: select * from order_info order by id where id between id_min and 23
  • 第三个分库改写为: select * from order_info order by id where id between id_min and 24

查询结果如下:

第四步:找到id_min的全局偏移量

第一次查询的偏移量为3,那么每一个库的第一个目标数据的偏移量应该都是4。因此可知每个库的id_min的偏移量:

  • 第一个库:8就是id_min,偏移量为4;
  • 第二个库:11的偏移量为4,那么id_min的偏移量就是1;
  • 第三个库:12的偏移量为4,那么id_min的偏移量就是3;

因此id_min的全局偏移量为:4 + 1 + 3 = 8。

第五步:定位目标数据

  • 第一个库:8,13,14,19,22
  • 第二个库:9,10,11,16,17,18,23
  • 第三个库:12,15,20,21,24

经过排序,得到:

8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24

因为id_min的全局偏移量为8,最终结果需要 limit 5 offset 10,因此需要向后推移10 - 8 = 2 位,然后再取5位,得到:

11,12,13,14,15

小结:二次查找法,既避免了数据越处理越多,也支持跳转查询。但其也存在短板,需要查询两次,才能拿到目标数据。


一、零拷贝技术

零拷贝技术,是相对于传统 IO 的一种技术思想。传统 IO :读写数据是在用户空间和内核空间来回复制,而内核空间的数据是通过操作系统层面的 IO 接口从磁盘读取或写入的,中间也需要多次拷贝,因此效率较低。零拷贝技术,目的是尽可能地减少上下文切换和拷贝次数,提升操作性能。

二、传统 IO 实现原理

当应用服务接收客户端的请求时,传统 IO 通常需要两种系统调用:

1
2
3
4
// 读取
read(file, tmp_buf, len);
// 写入
write(socket, tmp_buf, len);

从细分图中可知,虽然只是简单的读写操作,但内部的流程还是比较复杂的。

一次读写将发生 4 次上下文切换:

  • 读取数据:从用户态切换到内核态;
  • 读取完毕:内核完成数据准备,从内核态切换到用户态;
  • 写入数据:从用户态切换到内核态;
  • 写入完毕:内核完成数据写入,从内核态切换到用户态;

一次读写将发生 4 次数据拷贝 (2 次 DMA 拷贝 + 2 次 CPU 拷贝):

  • 第一次拷贝 (DMA):把磁盘文件数据拷贝到内核缓冲区;
  • 第二次拷贝 (CPU):把内核缓冲区的数据拷贝到用户缓冲区,供应用程序使用;
  • 第三次拷贝 (CPU):把用户缓冲区的数据拷贝到内核 socket 缓冲区;
  • 第四次拷贝 (DMA):把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

虽然一次上下文切换需耗时只有几微秒,但在高并发场景中,这种延迟容易被积累和放大,从而影响整体性能。此外,磁盘和网卡操作速度远远小于内存,而内存操作速度又远远小于 CPU,4 次拷贝将严重拖慢系统性能。因此,提高 IO 性能,需要从减少上下文切换次数和数据拷贝次数两方面入手。

三、零拷贝实现

基于以上的讨论,可知零拷贝技术的设计思路:尽可能地减少上下文切换次数和数据拷贝次数。

零拷贝的具体实现方式有:

  • mmap:将内核空间和用户空间的虚拟地址映射到同一物理地址;
  • sendfile:直接把内核缓冲区的数据拷贝到网卡缓冲区;
  • direct IO:在应用层与磁盘、网卡之间建立直接通道;
3.1 mmap 实现零拷贝

在介绍 mmap() 的作用机制之前,先介绍一个新概念:虚拟内存。虚拟内存是现代操作系统中普遍使用的内存结构,使用虚拟地址代替物理内存,有两点好处:一是多个虚拟地址可以指向同一个物理地址;二是虚拟内存空间远远大于物理内存空间。

在传统 IO 中,read() 调用会把内核缓冲区的数据拷贝到用户缓冲区,耗时又耗力。如果把内核空间和用户空间的虚拟地址映射到同一个物理地址,那么就不需要 CPU 来回复制了。

mmap() 正是利用了虚拟内存的这一特性,取代传统 IO 的 read() 操作,并将内核缓冲区和用户缓冲区地址映射到同一物理内存地址,省去一次 CPU 拷贝的过程,提升 IO 性能。具体过程如下:

  • 应用进程调用 mmap() 后,DMA 会把磁盘文件数据拷贝到内核缓冲区;

  • 应用进程跟操作系统内核共享这个缓冲区;

  • 应用进程再调用 write(),直接将内核缓冲区的数据拷贝到内核 socket 缓冲区;

  • DMA 把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

从调用过程可知,与传统 IO 相比,mmap() + write 只减少了 1 次 CPU 拷贝,仍然要发生 4 次上下文切换和 3 次数据拷贝。

3.2 sendfile() 实现零拷贝

senfile() 是 Linux 提供的,专门用于发送文件的系统调用函数。sendfile() 可以替代传统 IO 的 read()、write() 函数,这意味着将省去 2 次上下文切换。此外,数据拷贝路径也有所优化,具体的优化方案与 Linux 内核版本有关,因为在 2.4 版本之后,Linux 提供了 SG-DMA 技术,它将提供比 DMA 技术更进一步的优化策略。

在 2.4 版本之前,CPU 可以直接把内核缓冲区的数据拷贝到内核 socket 缓冲区,省去拷贝到用户缓冲区这一步,因此还存在 2 次上下文切换和 3 次数据拷贝。

具体执行步骤:

  • DMA 把磁盘文件数据拷贝到内核缓冲区;
  • CPU 把内核缓冲区的数据拷贝到内核 socket 缓冲区;
  • DMA 把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

在 2.4 版本之后,引入了 SG_DMA 技术,如果相应的网卡支持该技术,那么就可以把内核缓冲区的数据直接拷贝到网卡缓冲区,也就是说还存在 2 次上下文切换和 2 次数据拷贝。

具体执行步骤:

  • DMA 把磁盘文件数据拷贝到内核缓冲区;
  • 把内核缓冲区描述符和数据长度传到内核 socket 缓冲区;
  • SG-DMA 直接把内核缓冲区的数据拷贝到网卡缓冲区;

3.3 direct IO

直接 IO 是在用户缓冲区和磁盘、网卡之间建立直接通道的技术设计。直接 IO 在读写数据时,可以绕开内核,减少上下文切换和数据拷贝的次数,从而提高效率。

具体执行步骤:

  • DMA 把磁盘文件数据直接拷贝到用户缓冲区;
  • DMA 把用户缓冲区的数据直接拷贝到网卡缓冲区;

直接 IO 使用直接通道操作数据,由应用层完全管理数据,其优缺点也是很明显的。

优点:

  • 应用层与磁盘、网卡建立直接通道,减少了上下文切换和数据拷贝的次数,速度更快;
  • 数据直接缓存在应用层,应用可以更加灵活得操作数据;

缺点:

  • 在应用层引入直接 IO,需要应用层自主管理,给系统增添了额外的复杂度;
  • 若数据不在应用层缓冲区,那么将直接操作磁盘文件读写,将大大拖慢性能;


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、分片化 Step

如果一个 Step 的任务量比较大,可以尝试将其拆分成多个子任务。子任务之间可并行处理且互不干扰,这将大大提升批处理效率。例如:Master 这个 Step 迁移 100000 条数据需要 100 s,如果将其拆分为 100 个 Slave 任务,那么时间可缩短至 1 s。

Step 分片原理,是一个 Master 处理器对应多个 Salve 处理器。Slave 处理器可以是远程服务,也可以是本地执行线程。主从服务间的消息不需要持久化,也不需要严格保证传递,因为 JobRepository 的元数据管理,是将每个 Salve 独立保存在 batch_step_execution 中的,这样便可以保证每个 Slave 任务只执行一次。

Step 分片化,需要了解两个组件:分片器(Partitioner)和分片处理(PartitionHandler)。

  • 分片器(Partitioner):为每个 Slave 服务配置上下文(StepExecutionContext);

  • 分片处理(PartitionHandler):定义 Slave 服务的数量以及 Slave 任务内容;

比如在一个数据迁移 Step 中,分片处理就是将 1 个主任务拆分成 100 个从任务,并定义从任务的执行内容;分片器就是依次为这 100 个从任务划定数据迁移的范围(select * from table where id between ? and ?)。

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class PartitionTransferStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "masterTransferStudentStep1")
private Step masterTransferStudentStep;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("partitionTransferStudentJob")
.incrementer(new RunIdIncrementer())
.flow(masterTransferStudentStep)
.end()
.build();
}
}
3.2 Step 配置

MasterTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.example.springbatchdemo.component.partitioner.TransferStudentPartitioner;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class MasterTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "transferStudentPartitionHandler1")
private PartitionHandler transferStudentPartitionHandler;

@Autowired
private TransferStudentPartitioner transferStudentPartitioner;

@Bean("masterTransferStudentStep1")
public Step masterTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("masterTransferStudentStep1.manager")
.partitioner("masterTransferStudentStep1", transferStudentPartitioner)
.partitionHandler(transferStudentPartitionHandler)
.build();
}
}

SlaveTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.processor.SlaveStudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class SlaveTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "slaveTransferStudentItemReader")
private JdbcPagingItemReader<Student> slaveTransferStudentItemReader;

@Autowired
@Qualifier(value = "slaveTransferStudentItemWriter")
private JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter;

@Autowired
private SlaveStudentItemProcessor slaveStudentItemProcessor;


@Bean("slaveTransferStudentStep1")
public Step slaveTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("slaveTransferStudentStep1")
.transactionManager(transactionManager)
.<Student, Student>chunk(1000)
.reader(slaveTransferStudentItemReader)
.processor(slaveStudentItemProcessor)
.writer(slaveTransferStudentItemWriter)
.build();
}
}
3.3 Partitioner 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TransferStudentPartitioner implements Partitioner {

private static final Logger LOGGER = LoggerFactory.getLogger(TransferStudentPartitioner.class);

@Override
public Map<String, ExecutionContext> partition(int gridSize) {

Map<String, ExecutionContext> result = new HashMap<>(gridSize);

int range = 1000;
int fromId = 0;
int toId = range;

for (int i = 1; i <= gridSize; i++) {

ExecutionContext value = new ExecutionContext();

value.putInt("fromId", fromId);
value.putInt("toId", toId);

result.put("partition" + i, value);

fromId = toId;
toId += range;

LOGGER.info("partition{}; fromId: {}; toId: {}", i, fromId, toId);
}

return result;
}
}
3.4 Partition-Handler 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Step;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class TransferStudentPartitionHandler {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "slaveTransferStudentStep1")
private Step slaveTransferStudentStep;

@Bean("transferStudentPartitionHandler1")
public PartitionHandler transferStudentPartitionHandler1() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor);
retVal.setStep(slaveTransferStudentStep);
retVal.setGridSize(100);
return retVal;
}
}
3.5 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemReader")
@StepScope
public JdbcPagingItemReader<Student> slaveTransferStudentItemReader(@Value("#{stepExecutionContext[fromId]}") final Long fromId,
@Value("#{stepExecutionContext[toId]}") final Long toId) {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");
queryProvider.setWhereClause(String.format("where student_id > %s and student_id <= %s", fromId, toId));

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.6 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
@StepScope
public class SlaveStudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.7 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemWriter")
@StepScope
public JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 常规 Step

省略测试代码,具体请查看 demo。测试结果:

耗时:13s

4.2 分片化 Step

测试结果:

batch_step_execution 可以看出,共有 100 个子任务并行处理,每个子任务迁移 1000 条数据。

耗时:7s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、并行化 Step

一个 Job 可配置多个 StepStep 之间可能存在关联,需要有先有后;也可能没有关联,先执行哪一个都可以。那么,若将这些互不关联的 Step 进行并行化处理,将会有效提升批处理性能。

比如,现有一个批处理任务,包含 4 个 Step

  • step1:在学生姓名后面追加字符串 “1”;
  • step2:在学生姓名后面追加字符串 “2”;
  • step3:在学生住址后面追加字符串 “8”;
  • step4:迁移所有学生信息;

我们发现,修改学生姓名的任务与修改学生住址的任务,互不干扰,并不需要有先后之分。因此,我们可以将 step1step2step3 并行执行。串行 Step 与并行 Step 流程如下:

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchManageStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentSplitFlow1")
private Flow batchProcessStudentSplitFlow;

@Autowired
@Qualifier(value = "batchTransferStudentStep1")
private Step batchTransferStudentStep;

@Bean
public Job manageStudentJob() {
return jobBuilderFactory.get("manageStudentJob1")
.incrementer(new RunIdIncrementer())
// 姓名追加1、姓名追加2、地址追加8
.start(batchProcessStudentSplitFlow)
// 迁移学生信息; student_source -> student_target
.next(batchTransferStudentStep)
.end()
.build();
}
}
3.2 Fow 配置

batchProcessStudentSplitFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentSplitFlow {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "batchUpdateStudentNameOneAndTwoFlow")
private Flow batchUpdateStudentNameOneAndTwoFlow;

@Autowired
@Qualifier(value = "batchUpdateStudentAddressFlow1")
private Flow batchUpdateStudentAddressFlow;

@Bean("batchProcessStudentSplitFlow1")
public Flow batchProcessStudentSplitFlow1() {
return new FlowBuilder<SimpleFlow>("batchProcessStudentSplitFlow1")
.split(taskExecutor)
.add(batchUpdateStudentNameOneAndTwoFlow, batchUpdateStudentAddressFlow)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep1")
private Step batchUpdateStudentNameStep1;

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep2")
private Step batchUpdateStudentNameStep2;

@Bean("batchUpdateStudentNameOneAndTwoFlow")
public Flow updateStudentNameOneAndTwoFlow() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentNameOneAndTwoFlow")
.start(batchUpdateStudentNameStep1)
.next(batchUpdateStudentNameStep2)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentAddressStep1")
private Step batchUpdateStudentAddressStep;

@Bean("batchUpdateStudentAddressFlow1")
public Flow batchUpdateStudentAddressFlow1() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentAddressFlow1")
.start(batchUpdateStudentAddressStep)
.build();
}
}
3.3 Step 配置

BatchUpdateStudentNameStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.example.springbatchdemo.component.processor.AppendStudentNameOneProcessor;
import com.example.springbatchdemo.component.processor.AppendStudentNameTwoProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateName")
private JdbcBatchItemWriter<Student> studentItemUpdateName;

@Autowired
private AppendStudentNameOneProcessor appendStudentNameOneProcessor;

@Autowired
private AppendStudentNameTwoProcessor appendStudentNameTwoProcessor;

@Bean("batchUpdateStudentNameStep1")
public Step batchUpdateStudentNameStep1() {
return stepBuilderFactory.get("batchUpdateStudentNameStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 1
.processor(appendStudentNameOneProcessor)
.writer(studentItemUpdateName)
.build();
}

@Bean("batchUpdateStudentNameStep2")
public Step batchUpdateStudentNameStep2() {
return stepBuilderFactory.get("batchUpdateStudentNameStep2")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 2
.processor(appendStudentNameTwoProcessor)
.writer(studentItemUpdateName)
.build();
}
}

BatchUpdateStudentAddressStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.AppendStudentAddressProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateAddress")
private JdbcBatchItemWriter<Student> studentItemUpdateAddress;

@Autowired
private AppendStudentAddressProcessor appendStudentAddressProcessor;

@Bean("batchUpdateStudentAddressStep1")
public Step batchUpdateStudentAddressStep1() {
return stepBuilderFactory.get("batchUpdateStudentAddressStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 住址追加 8
.processor(appendStudentAddressProcessor)
.writer(studentItemUpdateAddress)
.build();
}
}

BatchProcessStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchTransferStudentStep1")
public Step batchTransferStudentStep1() {
return stepBuilderFactory.get("batchTransferStudentStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 迁移数据; student_source -> student_target
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.4 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
@StepScope
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.5 数据处理器

AppendStudentNameOneProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameOneProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameOneProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_1"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentNameTwoProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameTwoProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameTwoProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_2"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentAddressProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentAddressProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentAddressProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address.concat("_8"));

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

StudentItemProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.6 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateName")
@StepScope
public JdbcBatchItemWriter<Student> studentItemUpdateName() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET name = :name WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateAddress")
public JdbcBatchItemWriter<Student> studentItemUpdateAddress() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET address = :address WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}
}

@StepScope:

从上面的 Step 配置可知,studentItemReader 被多个 Step 引用。默认情况下 studentItemReader 的生命周期是与 Job 保持一致,那么在多 Step 引用的情况下,就会抛出类似下面这种异常:

1
>Caused by: java.lang.IllegalStateException: Cannot open an already opened ItemReader, call close first

使用注解 StepScope,让 studentItemReader 的生命周期与 Step 保持同步,保证每个 Step 拿到的 ItemReader 都是新的实例。同样,ItemWriterItemProcessor 存在多 Step 引用的,都要使用该注解。

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 串行 Step

串行 Step 批处理,只需要按照顺序配置 Step(省略代码示例)。测试结果:

耗时:91s

4.2 并行 Step

测试结果:

耗时:68s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、多线程 Step 配置

Spring Batch 执行一个 Step,会按照 chunk 配置的数量分批次提交。对于多线程 Step,由线程池去处理任务批次。因此,每个 chunk 都不用串行等待,这大大地提高了批处理性能。

配置多线程 Step 非常简单,可以通过 xml 或接口来配置。以接口配置为例:

1
2
3
4
5
6
7
8
9
10
11
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
// 节流配置, 不要超过线程池的最大线程数量
.throttleLimit(20)
.build();
}

此外,在配置多线程 Step 时,我们需要考虑得更多:

  • 线程池:推荐使用 Spring 线程池 ThreadPoolTaskExecutor,兼容性好;
  • 线程安全:输入器和输出器必须是线程安全的,否则可能会导致重复任务、脏数据等问题;
  • 框架节流:Spring Batch 自带节流器,默认最多可处理 4 个小任务,因此需要重新配置;

三、批处理配置

通过 Spring Batch 应用,迁移 100 万条数据。相关配置如下:

3.1 数据读取器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.2 数据映射器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import com.example.springbatchdemo.entity.Student;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;

public class StudentRowMapper implements RowMapper<Student> {

@Override
public Student mapRow(ResultSet rs, int rowNum) throws SQLException {
Student student = new Student();
student.setStudentId(rs.getLong("student_id"));
student.setName(rs.getString("name"));
student.setAddress(rs.getString("address"));
return student;
}
}
3.3 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.4 数据写入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}
3.5 Step 配置-单线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.6 Step 配置-多线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.taskExecutor(taskExecutor)
.throttleLimit(30)
.build();
}
}
3.7 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.example.springbatchdemo.component.listener.BatchProcessStudentCompletionListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchProcessStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentStep1")
private Step batchProcessStudentStep1;

@Autowired
private BatchProcessStudentCompletionListener batchProcessStudentCompletionListener;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("transferStudentJob")
.incrementer(new RunIdIncrementer())
.listener(batchProcessStudentCompletionListener)
.flow(batchProcessStudentStep1)
.end()
.build();
}
}
3.8 MySQL 数据源配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

@Primary
@Bean(name = "batchDemoDB")
// 数据源配置参数识别前缀, 根据具体配置来设定
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
// 使用 SpringBoot 默认的数据源 HikariDataSource
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
}
3.9 线程池配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class ExecutorConfig {

public static final String TASK_EXECUTOR = "taskExecutor";

@Bean(TASK_EXECUTOR)
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("common-async-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}

四、批处理性能测试

4.1 单线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:313 秒

4.2 多线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:81 秒


性能提升超300%

五、总结

多线程 Stepchunk 任务交给线程池异步执行,可以显著地提升批处理的性能。但在多线程场景下,我们要了解 Spring Batch 的基础架构,避免并发导致的重复任务、脏数据等问题。

示例代码:spring-batch-demo


一、Spring Batch 数据输出器

Spring Batch 的数据输出器,是通过接口 ItemWriter 来实现的。针对常用的数据输出场景,Spring Batch 提供了丰富的组件支持(查看所有组件),本文介绍最常用的五个组件:

  • FlatFileItemWriter:输出文本数据;
  • JdbcBatchItemWriter:输出数据到数据库;
  • StaxEventItemWriter:输出 XML 文件数据;
  • JsonFileItemWriter:输出 JSON 文件数据;
  • ClassifierCompositeItemWriter:输出多文本数据;

二、简单使用

实体类 Ticket.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}

文件 ticket.csv

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00
2.1 FlatFileItemWriter-文本数据输出

ticket.csv 中的信息,转换为 JSON 字符串,输出到文件 ticket_output.txt 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Job
*/
@Bean
public Job testFlatFileItemWriterJob() {
return jobBuilderFactory.get("testFlatFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testFlatFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testFlatFileItemWriterStep")
public Step testFlatFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testFlatFileItemWriterStep")
.transactionManager(transactionManager)
.reader(ticketFileItemReader)
.writer(ticketFileItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

输出文本数据,要求目标数据的格式为字符串,因此需要将 POJO 按照一定规则聚合成字符串。Spring Batch 已实现聚合器 LineAggregatorPassThroughLineAggregator(打印 POJO)、RecursiveCollectionLineAggregator(打印 POJO 列表)、DelimitedLineAggregator(分隔符拼接 POJO 字段值)、FormatterLineAggregator(格式化 POJO 字段值)。当然,我们也可以手动实现聚合器,例如示例代码中,将 POJO 转换为 JSON 格式。

启动应用,生成文件 ticket_output.txt

1
2
3
4
5
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00}
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00}
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
2.2 JdbcBatchItemWriter-数据库数据输出

将文件 student.cvs 中的信息(内容如下),导入到 MySQL 数据表 student 中:

1
2
3
1,张三,合肥
2,李四,蚌埠
3,王二,南京
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Job
*/
@Bean
public Job testDatabaseItemWriterJob() {
return jobBuilderFactory.get("testDatabaseItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testDatabaseItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testDatabaseItemWriterStep")
public Step testDatabaseItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testDatabaseItemWriterStep")
.transactionManager(transactionManager)
.<Student, Student>chunk(10)
.reader(studentFileItemReader)
.writer(studentItemWriter)
.build();
}

/**
* Reader
*/
@Bean("studentFileItemReader")
public FlatFileItemReader<Student> studentFileItemReader() {
return new FlatFileItemReaderBuilder<Student>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("student.csv"))
.delimited()
.names(new String[]{"studentId", "name", "address"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {{
setTargetType(Student.class);
}})
.build();
}

/**
* Writer
*/
@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {
return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

/**
* MySQL 数据源配置
*/
@Primary
@Bean(name = "batchDemoDB")
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

启动应用,文件中的数据已导入表 student

2.3 StaxEventItemWriter-XML 文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.xml 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Job
*/
@Bean
public Job testXmlItemWriterJob() {
return jobBuilderFactory.get("testXmlItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testXmlItemWriterStep)
.build();
}

/**
* Step
*/
@Bean("testXmlItemWriterStep")
public Step testXmlItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testXmlItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketXmlItemWriter)
.build();
}


/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件 ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>蚌埠</arrivalStation><price>220.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>杭州</arrivalStation><price>75.20</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>昆山</arrivalStation><price>19.00</price></ticket></tickets>
2.4 JsonFileItemWriter-JSON文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.json 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Job
*/
@Bean
public Job testJsonItemWriterJob() {
return jobBuilderFactory.get("testJsonItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testJsonItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testJsonItemWriterStep")
public Step testJsonItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testJsonItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketJsonItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketJsonItemWriter")
public JsonFileItemWriter<Ticket> ticketJsonItemWriter() {
return new JsonFileItemWriterBuilder<Ticket>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new FileSystemResource("ticket_output.json"))
.name("ticketJsonItemWriter")
.build();
}

启动应用,生成文件 ticket_output.json

1
2
3
4
5
6
7
[
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00},
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00},
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00},
{"departureStation":"上海","arrivalStation":"杭州","price":75.20},
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
]
2.5 ClassifierCompositeItemWriter-输出多文本数据

将文件 ticket.csv 中始发站为上海的车票信息输出到文本中,其余的输出到 XML 文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* Job
*/
@Bean
public Job testMultiFileItemWriterJob() {
return jobBuilderFactory.get("testMultiFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testMultiFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testMultiFileItemWriterStep")
public Step testMultiFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testMultiFileItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketClassifierMultiFileItemWriter)
.stream(ticketFileItemWriter)
.stream(ticketXmlItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Classifier Writer
*/
@Bean("ticketClassifierMultiFileItemWriter")
public ClassifierCompositeItemWriter<Ticket> ticketClassifierMultiFileItemWriter() {
ClassifierCompositeItemWriter<Ticket> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier((Classifier<Ticket, ItemWriter<? super Ticket>>) ticket -> {
// 始发站是上海的, 输出到文本中, 否则输出到 XML 文件中
return "上海".equals(ticket.getDepartureStation()) ? ticketFileItemWriter() : ticketXmlItemWriter();
});
return writer;
}

/**
* 文本-Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

/**
* XML-Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件如下:

ticket_output.txt

1
2
3
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}

ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket></tickets>

示例代码:spring-batch-demo