0%

“我排队第 50 名,别人排队第 200 名,为什么他先候补成功?”——这个问题困扰了无数抢票人。答案藏在 12306 候补系统的底层设计里:席位复用、区间独立排队、事件驱动匹配。本文从五个维度拆解这套世界级高并发系统的技术架构,让你不仅”知其然”,更”知其所以然”。


一、为什么 12306 如此难做?

在深入设计之前,先理解问题的规模。

1.1 业务复杂度

铁路售票与普通电商秒杀有本质区别:

对比维度 普通电商秒杀 12306 售票
商品粒度 SKU 级别(一部手机) 区间级别(北京→济南,同一座位不同区间)
库存管理 扣减库存即可 区间叠加、席位复用
并发竞争 单 SKU 多人竞争 同一座位多个区间交叉竞争
订单关联 独立订单 关联订单(联程票、往返票)

核心差异:一趟北京到上海的高铁,途经 10 个站点,同一个座位可以拆分成 9 段区间分别售卖。这带来的是指数级的复杂度。

1.2 规模数据

以 2024 年春运为例:

  • 日均访问量:100 亿次+
  • 倰值 QPS:每秒 100 万+
  • 候补订单:单日峰值 2000 万+
  • 车次数据:5000+ 趟车次,每趟车 500-2000 个座位

在这种规模下,任何不当的设计都会被瞬间放大成系统崩溃。


二、核心设计基石:席位复用

2.1 什么是席位复用?

席位复用是指同一个座位可以分段卖给不同乘客,只要乘车区间不重叠。

举例说明:

1
2
3
4
5
6
7
8
9
车次:G1 北京→青岛
途经站点:北京 → 天津 → 济南 → 淄博 → 青岛

座位 12A 的售卖情况:
- 张三:北京 → 济南(占用 北京→天津、天津→济南 两段)
- 李四:济南 → 青岛(占用 济南→淄博、淄博→青岛 两段)
- 王五:天津 → 淄博(占用 天津→济南、济南→淄博 两段)

结果:同一座位 12A,同时卖给 3 个人,互不冲突

这个设计极大提升了座位利用率,但技术实现难度陡增。

2.2 技术实现:Redis Bitmap

12306 使用 Redis Bitmap(位图) 存储每个座位的区间占用状态。

数据结构设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Key:   train:{train_id}:{date}:{seat_no}
Value: Bitmap,每一位代表一个站点区间的占用状态

示例:
车次 G1,日期 2024-01-20,座位 12A
途经站点:北京(0) → 天津(1) → 济南(2) → 淄博(3) → 青岛(4)

Bitmap: 1 1 0 0
↑ ↑ ↑ ↑
│ │ │ └─ 淄博→青岛 (0=空闲)
│ │ └─── 济南→淄博 (0=空闲)
│ └───── 天津→济南 (1=占用)
└─────── 北京→天津 (1=占用)

表示:北京→济南 已售,济南→青岛 空闲

为什么用 Bitmap?

指标 Bitmap 传统方案(如 Hash)
空间占用 N 个站点仅需 N bit(几字节) 每区间存一个字段(几百字节)
查询复杂度 O(1) O(N)
区间判断 位运算 OR,一次搞定 需遍历所有区间
适用场景 固定长度、高频查询 灵活但低效

区间可用性检查(伪代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def check_interval_available(redis, train_id, date, seat_no, from_station, to_station):
"""
检查指定区间是否可用
from_station: 起始站点索引(如北京=0)
to_station: 终点站点索引(如济南=2)
"""
key = f"train:{train_id}:{date}:{seat_no}"

# 获取 from_station 到 to_station-1 的所有 bit 位
# 即区间 [from_station, to_station)
for i in range(from_station, to_station):
bit = redis.getbit(key, i)
if bit == 1: # 该区间已被占用
return False

return True


def allocate_interval(redis, train_id, date, seat_no, from_station, to_station):
"""
分配区间:将对应 bit 位设为 1
"""
key = f"train:{train_id}:{date}:{seat_no}"

for i in range(from_station, to_station):
redis.setbit(key, i, 1)

return True

更高效的方式:使用位运算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def check_and_allocate(redis, train_id, date, seat_no, from_station, to_station):
"""
使用位运算一次性检查并分配区间
"""
key = f"train:{train_id}:{date}:{seat_no}"

# 1. 获取当前 bitmap 值(假设用整数表示)
current = redis.get(key)
if current is None:
current = 0

# 2. 构造区间掩码
# 例如 from=0, to=2,掩码为 0b0011 (低位表示前面的区间)
mask = (1 << to_station) - (1 << from_station) # 0b0011

# 3. 检查区间是否已被占用
if current & mask != 0:
return False # 区间已被占用

# 4. 分配区间(原子操作,需用 Lua 脚本)
# Lua 脚本保证检查+设置原子性
lua_script = """
local current = tonumber(redis.call('GET', KEYS[1])) or 0
local mask = tonumber(ARGV[1])
if (current & mask) == 0 then
redis.call('SET', KEYS[1], current | mask)
return 1
else
return 0
end
"""
result = redis.eval(lua_script, 1, key, mask)
return result == 1

三、分布式排队机制

3.1 核心设计:按区间独立排队

很多人以为候补是整趟车排一个大队列,其实不然。

12306 的设计:每个乘车区间独立排队。

1
2
3
4
5
6
7
8
9
10
车次 G1(北京→青岛)的候补队列:

队列1:北京 → 天津(85 人排队)
队列2:北京 → 济南(320 人排队)
队列3:天津 → 济南(56 人排队)
队列4:济南 → 青岛(198 人排队)
队列5:北京 → 青岛(1275 人排队)
...

每个区间一个独立的 Sorted Set

为什么要按区间独立排队?

  1. 公平性:北京→天津 和 北京→青岛 根本不是同一批票,放一起排队没有意义
  2. 并行处理:不同区间队列可以并发处理,提升吞吐
  3. 精准匹配:释放席位时,直接定位对应队列,无需遍历

这解释了一个常见疑问

为什么我排队 50 名,别人排队 200 名,他却先候补成功?

因为你们根本不在同一个队列。他排的是天津→济南,你排的是北京→济南。

3.2 技术实现:Redis Sorted Set

数据结构设计:

1
2
3
4
5
6
7
8
9
10
Key:   backup:{train_id}:{date}:{from_station}:{to_station}
Value: 有序集合,存储候补订单 ID
Score: 用户提交候补的时间戳(越小越靠前)

示例:
backup:G1:20240120:beijing:jinan
├─ order_001 (score: 1705678901) ← 第1名
├─ order_002 (score: 1705678905) ← 第2名
├─ order_003 (score: 1705678912) ← 第3名
└─ ...

核心操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import time

def add_to_backup_queue(redis, train_id, date, from_station, to_station, order_id):
"""
加入候补队列(ZADD)
时间戳作为 Score,天然实现先到先得
"""
key = f"backup:{train_id}:{date}:{from_station}:{to_station}"
score = int(time.time() * 1000) # 毫秒时间戳,精确排队

redis.zadd(key, {order_id: score})
return True


def get_queue_position(redis, train_id, date, from_station, to_station, order_id):
"""
查询排队名次(ZRANK)
返回当前排队位置(从 0 开始,+1 后为实际名次)
"""
key = f"backup:{train_id}:{date}:{from_station}:{to_station}"
rank = redis.zrank(key, order_id)

if rank is None:
return -1 # 未在队列中
return rank + 1 # 返回实际名次(从 1 开始)


def get_top_n_from_queue(redis, train_id, date, from_station, to_station, n=10):
"""
取出队列前 N 名(ZRANGE)
"""
key = f"backup:{train_id}:{date}:{from_station}:{to_station}"
return redis.zrange(key, 0, n - 1)


def remove_from_queue(redis, train_id, date, from_station, to_station, order_id):
"""
候补成功或取消,移出队列(ZREM)
"""
key = f"backup:{train_id}:{date}:{from_station}:{to_station}"
redis.zrem(key, order_id)
return True

性能分析:

操作 复杂度 百万级队列耗时
ZADD(加入队列) O(log N) ~21 次比较,微秒级
ZRANK(查询名次) O(log N) 微秒级
ZRANGE(取前 N 名) O(log N + M) M 为取出数量,毫秒级
ZREM(移出队列) O(log N) 微秒级

结论:Sorted Set 天然适合排队场景,百万级数据依然高效。

3.3 数据持久化:MySQL 分库分表

Redis 是内存数据库,一旦宕机可能丢失数据。候补订单必须持久化到 MySQL。

分库分表策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
分库键:user_id % 16(按用户分 16 个库)
分表键:order_id % 128(每个库 128 张表)

表结构:
CREATE TABLE backup_order_00 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
train_id VARCHAR(20) NOT NULL,
travel_date DATE NOT NULL,
from_station VARCHAR(20) NOT NULL,
to_station VARCHAR(20) NOT NULL,
seat_type TINYINT NOT NULL, -- 座位类型:商务/一等/二等
status TINYINT DEFAULT 0, -- 状态:排队中/已兑现/已取消
queue_position INT, -- 冗余存储排队名次
create_time DATETIME,
update_time DATETIME,

INDEX idx_train_date (train_id, travel_date, from_station, to_station)
);

Redis 与 MySQL 如何保持一致?

这是一个典型的分布式一致性问题,后文技术难点部分详解。


四、事件驱动:候补车票从哪里来?

候补的车票不是凭空产生的,全部来自系统事件触发。

4.1 四大票源

票源 触发事件 延迟 占比(估算)
用户退票 用户主动退票 秒级 40%
订单超时 抢到票后 30 分钟未支付 秒级 35%
改签释放 用户改签其他车次 秒级 15%
动态加挂 12306 官方追加车厢 分钟级 10%

4.2 事件驱动架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌──────────────────────────────────────────────────────────────────┐
│ 事件来源 │
├──────────┬──────────┬──────────┬─────────────────────────────────┤
│ 退票事件 │ 超时事件 │ 改签事件 │ 加挂事件(定时任务触发) │
└────┬─────┴────┬─────┴────┬─────┴─────────────┬───────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────────────────────────────────────────────────────────┐
│ 消息队列(Kafka/RocketMQ) │
│ │
│ Topic: ticket-released │
│ Partition: 按 train_id 分区,保证同一车次顺序处理 │
└───────────────────────────┬──────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ 候补匹配服务 │
│ │
│ 1. 消费释放席位事件 │
│ 2. 查询对应区间的候补队列(Redis Sorted Set) │
│ 3. 取出队列第一名 │
│ 4. 执行兑现流程 │
└──────────────────────────────────────────────────────────────────┘

事件消息结构:

1
2
3
4
5
6
7
8
9
10
11
12
{
"event_id": "evt_20240120_123456",
"event_type": "REFUND", // REFUND / TIMEOUT / RESCHEDULE / EXTRA_SEAT
"train_id": "G1",
"travel_date": "2024-01-20",
"seat_no": "12A",
"from_station": "beijing",
"to_station": "jinan",
"seat_type": 2, // 二等座
"release_time": 1705678901234,
"trace_id": "trace_abc123" // 链路追踪
}

4.3 候补匹配流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
席位释放事件触发:

Step 1: 解析事件,确定释放的区间
train_id=G1, date=20240120, seat=12A, from=beijing, to=jinan

Step 2: 计算该区间覆盖的所有原子区间
北京→济南 = [北京→天津, 天津→济南]

Step 3: 查询这些区间的候补队列
- 队列 A:北京→天津(85 人)
- 队列 B:天津→济南(56 人)
- 队列 C:北京→济南(320 人)← 完全匹配,优先处理

Step 4: 尝试匹配队列 C 的第一名
- 检查席位 12A 是否满足用户需求(座位类型、是否有票)
- 满足 → 执行兑现流程
- 不满足 → 尝试队列 C 的第二名

Step 5: 若队列 C 无匹配,尝试其他队列
- 队列 A + 队列 B 是否有同一座位?
- 这涉及跨区间匹配,后文详解

4.4 候补兑现完整流程

席位释放后,系统需要执行一整套原子流程才能完成兑现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
┌──────────────────────────────────────────────────────────────────────────┐
│ 候补兑现完整流程 │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ 席位释放事件 │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 1: 查询候补队列 ││
│ │ ││
│ │ redis.zrange("backup:G1:20240120:beijing:jinan", 0, 0) ││
│ │ → 取出排队第一名 user_id ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 2: 获取分布式锁 ││
│ │ ││
│ │ lock_keys = ["lock:G1:20240120:12A:beijing:tianjin", ││
│ │ "lock:G1:20240120:12A:tianjin:jinan"] ││
│ │ redis.set(lock_key, user_id, nx=True, px=5000) ││
│ │ → 锁定所有原子区间,防止并发超卖 ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 3: 检查席位可用性(双重检查) ││
│ │ ││
│ │ bitmap_key = "train:G1:20240120:12A" ││
│ │ check_interval_available(bitmap_key, beijing, jinan) ││
│ │ → 确认席位真正可用 ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 4: MySQL 本地事务(原子操作) ││
│ │ ││
│ │ BEGIN TRANSACTION; ││
│ │ -- 4.1 标记席位占用 ││
│ │ UPDATE seat SET status='OCCUPIED' WHERE ...; ││
│ │ -- 4.2 创建订单 ││
│ │ INSERT INTO ticket_order (...) VALUES (...); ││
│ │ -- 4.3 更新候补状态 ││
│ │ UPDATE backup_order SET status='FULFILLED' WHERE ...; ││
│ │ -- 4.4 写入操作日志(用于 Redis 异步同步) ││
│ │ INSERT INTO backup_operation_log (...) VALUES (...); ││
│ │ COMMIT; ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 5: 异步更新 Redis ││
│ │ ││
│ │ -- 5.1 更新席位 Bitmap ││
│ │ redis.setbit("train:G1:20240120:12A", beijing_idx, 1) ││
│ │ redis.setbit("train:G1:20240120:12A", tianjin_idx, 1) ││
│ │ -- 5.2 从候补队列移除 ││
│ │ redis.zrem("backup:G1:20240120:beijing:jinan", user_id) ││
│ │ -- 5.3 释放分布式锁 ││
│ │ redis.del(lock_keys) ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 6: 后续处理 ││
│ │ ││
│ │ -- 6.1 冻结预付款(调用支付服务) ││
│ │ -- 6.2 发送短信通知(调用通知服务) ││
│ │ -- 6.3 更新操作日志状态 ││
│ └─────────────────────────────────────────────────────────────────────┘│
│ │
└──────────────────────────────────────────────────────────────────────────┘

流程关键点

步骤 关键点 失败处理
获取锁 按字典序获取,避免死锁 获取失败 → 席位已被抢占,跳过
双重检查 锁后再检查,防止并发穿透 不可用 → 释放锁,尝试下一名用户
MySQL 事务 本地事务保证原子性 失败 → 释放锁,回滚,尝试下一名
异步更新 Redis 通过操作日志保证最终一致 失败 → 定时任务补偿恢复
后续处理 异步执行,不阻塞主流程 失败 → 重试或告警人工处理

五、高并发架构设计

5.1 四级防护架构

从用户请求到数据落地,12306 构建了四级防护体系,逐级降压:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────────────────────────────┐
│ 用户请求 │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 第一级:网关层限流 │
│ │
│ 令牌桶算法:限制单 IP、单用户请求频率 │
│ 防刷策略:识别异常请求模式 │
│ 作用:挡住恶意流量,保护后端服务 │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 第二级:应用层本地缓存 │
│ │
│ 技术:Caffeine / Guava Cache │
│ 缓存内容: │
│ - 热门车次的区间队列长度(用户查询"排队第几名") │
│ - 热门车次的席位概览 │
│ - TTL:5-10 秒(容忍短时不一致) │
│ 效果:拦截 60%+ 的查询请求,减轻 Redis 压力 │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 第三级:Redis 集群缓存 │
│ │
│ 部署:主从 + 哨兵,保证高可用 │
│ 存储: │
│ - 席位 Bitmap(实时状态) │
│ - 候补 Sorted Set(排队数据) │
│ - 分布式锁(并发控制) │
│ 性能:单节点 10 万+ QPS,全内存操作 │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 第四级:MySQL 分库分表 │
│ │
│ 存储:订单持久化、用户数据、候补记录 │
│ 分片:按 user_id 分库,按 order_id 分表 │
│ 作用:数据兜底、Redis 故障时的降级数据源 │
└─────────────────────────────────────────────────────────────────┘

为什么需要四级?

级别 核心作用 如果缺失会怎样
网关限流 拦截恶意流量 后端被刷爆,正常用户无法访问
本地缓存 拦截高频查询 Redis 被查询请求压垮
Redis 缓存 扛住核心读写 MySQL 被高频请求打穿
MySQL 持久化 数据兜底 Redis 故障时数据丢失

5.2 限流降级策略

场景 限流策略 降级措施 用户感知
正常高峰 令牌桶 10 万 QPS 正常服务
超高峰 滑动窗口限流 排队等待 “系统繁忙,请稍后”
Redis 故障 熔断 查询走 MySQL,写入走 MQ 候补延迟,排队名次不可查
MySQL 故障 熔断 暂停下单,只读 “系统维护中”

六、技术难点深入

前文介绍了 12306 候补系统的核心设计,但真正让系统”稳如磐石”的,是对细节的极致处理。

6.1 难点一:分布式一致性

问题:Redis 队列与 MySQL 订单如何保持一致?

1
2
3
4
5
6
7
8
9
10
11
场景:用户取消候补

方案一:先更新 MySQL,再删除 Redis
1. MySQL DELETE 成功
2. Redis ZREM 失败(网络抖动)
→ 结果:MySQL 无订单,Redis 仍在排队,用户"幽灵排队"

方案二:先删除 Redis,再更新 MySQL
1. Redis ZREM 成功
2. MySQL DELETE 失败
→ 结果:Redis 已删除,MySQL 仍存在,用户想恢复排队无法恢复

解决方案:本地消息表 + 最终一致性

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 候补操作日志表(与业务表同库,利用本地事务)
CREATE TABLE backup_operation_log (
log_id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
operation_type VARCHAR(20) NOT NULL, -- ADD / CANCEL / FULFILL
redis_status TINYINT DEFAULT 0, -- 0=待处理 1=成功 2=失败
mysql_status TINYINT DEFAULT 0, -- 0=待处理 1=成功 2=失败
retry_count INT DEFAULT 0,
create_time DATETIME,
update_time DATETIME,

INDEX idx_status (redis_status, mysql_status)
);

操作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def cancel_backup_order(order_id):
"""取消候补订单(保证最终一致性)"""

# Step 1: 本地事务,写入操作日志 + 更新订单状态
with db.transaction():
# 标记订单为"取消中"
db.execute("UPDATE backup_order SET status = 'CANCELING' WHERE order_id = ?", order_id)

# 写入操作日志
log_id = db.execute("""
INSERT INTO backup_operation_log
(order_id, operation_type, redis_status, mysql_status)
VALUES (?, 'CANCEL', 0, 0)
""", order_id)

# Step 2: 异步处理 Redis 操作
mq.send('backup_operation', {'log_id': log_id, 'order_id': order_id, 'type': 'CANCEL'})

return True


def process_cancel_message(message):
"""消费取消消息"""
log_id = message['log_id']
order_id = message['order_id']

try:
# Step 3: 从 Redis 队列移除
redis.zrem(f"backup:{train}:{date}:{from}:{to}", order_id)

# Step 4: 更新 MySQL 订单状态
db.execute("UPDATE backup_order SET status = 'CANCELED' WHERE order_id = ?", order_id)

# Step 5: 更新操作日志
db.execute("UPDATE backup_operation_log SET redis_status=1, mysql_status=1 WHERE log_id=?", log_id)

except Exception as e:
# 失败后重试,超过 3 次告警
retry_count = db.query("SELECT retry_count FROM backup_operation_log WHERE log_id=?", log_id)
if retry_count < 3:
db.execute("UPDATE backup_operation_log SET retry_count = retry_count + 1 WHERE log_id=?", log_id)
mq.send('backup_operation', message, delay=60) # 60 秒后重试
else:
alert(f"候补取消失败,log_id={log_id}, error={e}")

定时对账任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def reconcile_backup_data():
"""每小时对账,确保 Redis 与 MySQL 一致"""

# Step 1: 扫描所有候补队列
all_queues = redis.keys("backup:*")

for queue_key in all_queues:
order_ids = redis.zrange(queue_key, 0, -1)

for order_id in order_ids:
# Step 2: 检查 MySQL 是否存在
order = db.query("SELECT * FROM backup_order WHERE order_id = ?", order_id)

if not order or order.status == 'CANCELED':
# Step 3: Redis 存在,MySQL 不存在或已取消,清理 Redis
redis.zrem(queue_key, order_id)
log.info(f"清理幽灵排队:{order_id}")

6.2 难点二:并发竞争与超卖

问题:同一席位释放,多个区间队列如何竞争?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
场景:
车次 G1,座位 12A
张三退票:北京 → 济南

释放区间:北京→天津、天津→济南

现有候补队列:
- 队列 A:北京→天津(100 人)
- 队列 B:天津→济南(80 人)
- 队列 C:北京→济南(200 人)

如果不加控制:
1. 队列 A 把 12A 分给用户 X(北京→天津)
2. 队列 C 把 12A 分给用户 Y(北京→济南)
→ X 和 Y 的区间重叠,超卖!

解决方案:分布式锁 + 区间锁定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def allocate_seat_to_backup(train_id, date, seat_no, from_station, to_station, user_id):
"""
分配席位给候补用户
核心思想:锁定所有涉及的原子区间,防止并发超卖
"""

# Step 1: 获取该区间的所有原子区间(相邻站点间)
atomic_intervals = get_atomic_intervals(train_id, from_station, to_station)
# 例如:北京→济南 = [(北京,天津), (天津,济南)]

# Step 2: 生成锁 Key(按字典序排序,避免死锁)
lock_keys = sorted([
f"lock:{train_id}:{date}:{seat_no}:{f}:{t}"
for f, t in atomic_intervals
])

# Step 3: 尝试获取所有锁(SET NX PX,带超时)
lock_acquired = []
for key in lock_keys:
success = redis.set(key, user_id, nx=True, px=5000) # 5 秒超时
if not success:
# 获取失败,释放已获取的锁
for acquired_key in lock_acquired:
redis.delete(acquired_key)
return False, "席位已被占用"
lock_acquired.append(key)

try:
# Step 4: 再次检查席位可用性(双重检查)
if not check_seat_available(train_id, date, seat_no, from_station, to_station):
return False, "席位不可用"

# Step 5: 执行兑现流程
# 5.1 标记席位占用
mark_seat_occupied(train_id, date, seat_no, from_station, to_station, user_id)

# 5.2 创建订单(MySQL 本地事务)
order_id = create_ticket_order(user_id, train_id, date, seat_no, from_station, to_station)

# 5.3 冻结预付款
freeze_payment(user_id, order_id)

# 5.4 从候补队列移除
redis.zrem(f"backup:{train_id}:{date}:{from_station}:{to_station}", user_id)

# 5.5 发送通知
send_notification(user_id, f"候补成功!订单号:{order_id}")

return True, order_id

finally:
# Step 6: 释放锁
for key in lock_acquired:
# 使用 Lua 脚本,确保只释放自己持有的锁
redis.eval("""
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
""", 1, key, user_id)

关键点

  1. 锁粒度:锁到”原子区间”级别(相邻站点间),而非整个座位
  2. 锁顺序:按字典序获取锁,避免死锁(A 等待 B,B 等待 A)
  3. 双重检查:获取锁后再次检查可用性,防止并发穿透
  4. 原子释放:使用 Lua 脚本,确保只释放自己持有的锁

6.3 难点三:跨区间匹配

问题:候补”长区间”,释放”短区间”如何匹配?

1
2
3
4
5
6
7
8
9
10
11
场景:
用户候补:北京 → 青岛(全程)

当前释放:
- 张三退票:北京 → 天津(仅第一段)
- 李四退票:天津 → 济南(仅第二段)
- 王五退票:济南 → 青岛(仅后两段)

能否匹配?
- 如果三段都同时释放,可以合并给用户
- 但现实中,退票是离散事件,难以凑齐所有区间

方案一:不跨区间匹配(简单,12306 当前方案)

1
2
3
4
5
6
7
8
9
10
11
规则:
- 用户候补 北京→青岛,必须有 北京→青岛 的完整区间空位
- 不能由 北京→天津 + 天津→青岛 拼凑

优点:
- 实现简单,逻辑清晰
- 避免复杂的跨区间锁竞争

缺点:
- 用户体验稍差,需要分段候补
- 票源利用率降低

方案二:智能跨区间匹配(复杂,提升体验)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def smart_match_backup(train_id, date, from_station, to_station, user_id):
"""
智能匹配:尝试拼接多个短区间满足长区间需求
核心逻辑:遍历所有座位,找到在所有区间都空闲的座位
"""

# Step 1: 获取所有原子区间
atomic_intervals = get_atomic_intervals(train_id, from_station, to_station)
# 例如:北京→青岛 = [北京→天津, 天津→济南, 济南→淄博, 淄博→青岛]

# Step 2: 获取该车次所有座位列表
all_seats = get_all_seats(train_id, date, seat_type)
# 例如:['12A', '12B', '12C', '13A', '13B', ...]

# Step 3: 遍历每个座位,检查是否在所有区间都空闲
for seat_no in all_seats:
bitmap_key = f"train:{train_id}:{date}:{seat_no}"

# 检查该座位在所有原子区间是否空闲
all_available = True
for i, (f, t) in enumerate(atomic_intervals):
station_idx = get_station_index(f) # 获取站点索引
bit = redis.getbit(bitmap_key, station_idx)
if bit == 1: # 该区间已被占用
all_available = False
break

if all_available:
# 找到一个在所有区间都空闲的座位
return seat_no

# Step 4: 无完整匹配,返回 None
# 可选:记录缺票区间,等待后续释放
return None


def on_seat_released(train_id, date, seat_no, from_station, to_station):
"""
席位释放事件触发匹配
"""

# 查询所有可能匹配的候补队列
# 包括:精确匹配 + 跨区间匹配等待中的用户
related_queues = find_related_backup_queues(train_id, date, from_station, to_station)

for queue_key in related_queues:
# 取出队列前 10 名,尝试匹配
candidates = redis.zrange(queue_key, 0, 9)

for user_id in candidates:
# 获取该用户的候补需求
request = get_backup_request(user_id)

# 尝试为该用户匹配座位
seat = smart_match_backup(
train_id, date,
request.from_station, request.to_station,
user_id
)

if seat:
allocate_seat(seat, request.from_station, request.to_station, user_id)
break # 席位已分配,退出当前队列

跨区间匹配的挑战

挑战点 说明
复杂度激增 需维护”缺票区间索引”,查询效率下降
并发竞争更复杂 多个部分匹配用户竞争同一区间
用户体验不确定 部分匹配时,用户不知道自己排的是哪个队列

实际权衡:12306 当前采用”不跨区间匹配”策略,牺牲部分体验换取系统简洁。


6.4 难点四:热点倾斜

问题:春运热门线路,单个区间队列百万级

1
2
3
4
5
6
7
8
9
10
11
12
场景:
春节前一周,北京→哈尔滨,候补队列 200 万人

Sorted Set 性能:
- ZADD: O(log N) = log(2,000,000) ≈ 21 次操作
- ZRANK: O(log N) ≈ 21 次操作
- 单次操作微秒级,没问题

问题:
1. 频繁 ZRANK 查询排队名次,热点 Key
2. 频繁 ZRANGE 取队首用户,热点 Key
3. Redis 单线程,单 Key 成为瓶颈

解决方案:分片队列 + 虚拟排队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 方案一:队列分片
def get_shard_key(train_id, date, from_station, to_station, user_id):
"""
将单个大队列拆分成多个小队列
按 user_id 分片,保证同一用户始终在同一个分片
"""
shard_count = 32 # 分片数量
shard_id = hash(user_id) % shard_count
return f"backup:{train_id}:{date}:{from_station}:{to_station}:shard:{shard_id}"


def add_to_sharded_queue(train_id, date, from_station, to_station, user_id):
"""
加入分片队列
"""
shard_key = get_shard_key(train_id, date, from_station, to_station, user_id)
score = int(time.time() * 1000)
redis.zadd(shard_key, {user_id: score})


def get_queue_position_sharded(train_id, date, from_station, to_station, user_id):
"""
查询分片队列中的名次
"""
shard_key = get_shard_key(train_id, date, from_station, to_station, user_id)

# 本分片内的名次
local_rank = redis.zrank(shard_key, user_id)
if local_rank is None:
return -1

# 加上其他分片的历史完成数
total_finished = sum([
redis.get(f"backup:{train_id}:{date}:{from_station}:{to_station}:shard:{i}:finished")
for i in range(32)
])

# 虚拟名次(更准确的估算)
return local_rank + total_finished + 1


# 方案二:虚拟排队名次
def get_virtual_position(redis, train_id, date, from_station, to_station, user_id):
"""
不返回真实名次,而是"虚拟名次"
避免用户看到"排队 150 万名"直接放弃
"""
real_rank = get_real_position(redis, train_id, date, from_station, to_station, user_id)

if real_rank <= 100:
# 前 100 名,返回真实名次
return real_rank
elif real_rank <= 1000:
# 100-1000 名,显示"排名前 1000"
return f"前 1000 名"
else:
# 1000 名以后,显示预估等待时间
avg_fulfill_rate = get_historical_fulfill_rate(train_id, from_station, to_station)
wait_hours = real_rank / avg_fulfill_rate
return f"预计等待 {wait_hours:.1f} 小时"

本地缓存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 方案三:热点数据本地缓存
from cachetools import TTLCache

# 本地缓存:热门车次的队列长度
queue_length_cache = TTLCache(maxsize=10000, ttl=10) # 10 秒过期

def get_queue_length(train_id, date, from_station, to_station):
"""
获取队列长度(优先本地缓存)
"""
cache_key = f"{train_id}:{date}:{from_station}:{to_station}"

if cache_key in queue_length_cache:
return queue_length_cache[cache_key]

# 本地缓存未命中,查询 Redis
length = redis.zcard(f"backup:{train_id}:{date}:{from_station}:{to_station}")
queue_length_cache[cache_key] = length

return length

6.5 难点五:容灾与降级

问题:Redis 故障时如何保证候补可用?

1
2
3
4
故障场景:
1. Redis 主节点宕机,哨兵切换中(30 秒不可用)
2. Redis 集群网络分区,部分数据不可达
3. Redis 内存溢出,被系统 OOM Killer 杀掉

多级容灾设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
┌─────────────────────────────────────────────────────────────────┐
│ 正常流程 │
│ │
│ 用户请求 → Redis Sorted Set 排队 → 候补匹配 → 订单创建 │
│ │
└───────────────────────────┬─────────────────────────────────────┘
│ Redis 故障检测

┌─────────────────────────────────────────────────────────────────┐
│ 降级流程 │
│ │
│ 用户请求 → MQ 消息队列暂存 → MySQL 记录排队 → 后台任务恢复 │
│ │
│ 具体措施: │
│ 1. 新增候补请求写入 MQ(Kafka),不直接写 Redis │
│ 2. MySQL 记录候补订单(status=PENDING) │
│ 3. 后台任务监控 Redis 恢复后,将 MQ 消息同步到 Redis │
│ 4. 查询排队名次返回"系统繁忙,请稍后查询" │
│ │
└─────────────────────────────────────────────────────────────────┘

降级代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def add_backup_order_with_fallback(train_id, date, from_station, to_station, user_id):
"""
加入候补队列(带降级)
"""
try:
# 尝试写入 Redis
redis.zadd(f"backup:{train_id}:{date}:{from_station}:{to_station}",
{user_id: time.time()})
return True, "排队成功"

except RedisError as e:
# Redis 故障,降级到 MQ
log.error(f"Redis 故障,降级到 MQ: {e}")

# 写入 MQ
mq.send('backup_queue_pending', {
'train_id': train_id,
'date': date,
'from': from_station,
'to': to_station,
'user_id': user_id,
'timestamp': time.time()
})

# 写入 MySQL(状态为 PENDING,待恢复)
db.execute("""
INSERT INTO backup_order
(user_id, train_id, travel_date, from_station, to_station, status)
VALUES (?, ?, ?, ?, ?, 'PENDING')
""", user_id, train_id, date, from_station, to_station)

return True, "排队成功,系统繁忙,名次稍后可查"


def recover_redis_from_mq():
"""
后台任务:Redis 恢复后,从 MQ 恢复排队数据
关键:使用幂等操作,防止重复添加
"""
messages = mq.consume('backup_queue_pending', batch_size=100)

for msg in messages:
try:
queue_key = f"backup:{msg['train_id']}:{msg['date']}:{msg['from']}:{msg['to']}"

# 幂等检查:先查询用户是否已在队列中
existing_score = redis.zscore(queue_key, msg['user_id'])
if existing_score is not None:
# 用户已在队列中,跳过(保留原有的 Score,不更新)
log.info(f"用户 {msg['user_id']} 已在队列中,跳过恢复")
else:
# 用户不在队列中,添加到队列
redis.zadd(queue_key, {msg['user_id']: msg['timestamp']})

# 更新 MySQL 状态为 QUEUED
db.execute("""
UPDATE backup_order SET status = 'QUEUED'
WHERE user_id = ? AND train_id = ? AND travel_date = ?
""", msg['user_id'], msg['train_id'], msg['date'])

except RedisError as e:
log.error(f"Redis 仍未恢复,稍后重试: {e}")
break # Redis 仍不可用,等待下次恢复

降级流程关键点

关键点 说明
幂等恢复 检查用户是否已在队列,避免重复添加或 Score 被更新
状态追踪 MySQL 记录 PENDING→QUEUED 状态变化,便于监控
批量消费 每次消费 100 条,避免单条失败阻塞整批
优雅退出 Redis 仍不可用时 break,保留未处理消息供下次恢复

七、总结与思考

7.1 12306 候补系统的设计精髓

设计点 核心思想 技术实现
席位复用 同一座位分段售卖,提升利用率 Redis Bitmap
分布式排队 按区间独立排队,公平精准 Redis Sorted Set
事件驱动 四类票源触发自动兑现 Kafka + 消费者模式
高并发架构 四级防护 + 限流降级 网关 + Caffeine + Redis + MySQL
一致性保证 本地消息表 + 最终一致 MySQL 事务 + MQ
并发控制 分布式锁 + 双重检查 Redis SET NX + Lua
容灾降级 MQ 暂存 + 幂等恢复 Kafka + 定时对账

7.2 为什么官方候补比第三方靠谱?

对比维度 官方候补 第三方抢票软件
数据源 直接操作席位数据,无延迟 轮询 12306 接口,有延迟
排队公平性 时间戳排序,先到先得 无法获取真实排队数据
票源覆盖 四类票源全覆盖,包括动态加挂 只能监控部分票源
系统稳定性 高并发架构 + 容灾降级 容易被 12306 限流封禁

7.3 可借鉴的架构思想

  1. 数据结构选型决定系统上限:Bitmap 和 Sorted Set 的选择,让百万级数据依然高效

  2. 分而治之解决规模问题:按区间独立排队,将全局问题拆解为局部问题

  3. 事件驱动解耦复杂逻辑:退票、超时、改签、加挂,统一为事件,简化处理

  4. 四级防护扛住高并发:网关限流 → 本地缓存 → Redis → MySQL,逐级降压

  5. 最终一致胜过强一致:分布式系统中,最终一致更易实现,用户体验更好

  6. 幂等设计防止重复操作:降级恢复时先检查再添加,避免数据错乱


Oracle SQL*LoaderOracle 数据库管理系统中一个强大的工具,用于高效地将大量数据从外部文件加载到 Oracle 数据库中。它提供了一种快速、灵活的方式来导入数据,适用于各种数据格式和文件类型。本文将介绍 SQL*Loader 的基本概念、工作原理以及实际应用场景。

一、什么是 SQL*Loader?

SQL*Loader 是一个用于导入数据的实用程序,允许用户将普通文件、CSV 文件等外部数据源中的数据加载到 Oracle 数据库表中。它是 Oracle 数据库中的一个标准工具,可以轻松地处理大规模的数据加载任务。

二、SQL*Loader 的工作原理

SQL*Loader 的工作原理比较简单:

  • 控制文件定义:编写一个控制文件,其中指定了要加载的目标表、字段映射关系、数据格式等信息。控制文件是 SQL*Loader 的核心配置文件之一;

  • 准备外部数据文件:用户需要准备一个包含待加载数据的外部文件,可以是纯文本文件、CSV 文件等格式;

  • 运行 SQL*Loader:通过命令行或者其他界面工具运行 SQL*Loader,并指定控制文件和数据文件的位置。SQL*Loader 将根据控制文件加载到目标表中;

  • 数据加载SQL*Loader 按照控制文件中指定的规则,逐行解析外部数据文件,并将数据插入到目标表中;

三、SQL*Loader 控制文件

这是一个控制文件模板样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
LOAD DATA
INFILE 'data.csv' -- 指定外部数据文件的路径
INTO TABLE employees -- 指定目标表名
CHARACTERSET UTF8 -- 指定外部数据文件的字符集编码为 UTF-8
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' -- 指定字段分隔符和可选的字段包裹符
(
employee_id, -- 目标表的列名
first_name,
last_name,
email,
hire_date DATE 'YYYY-MM-DD' -- 数据格式化,确保日期格式正确
)
WHEN (hire_date >= '2022-02-01')
  • LOAD DATA 表示开始加载数据的声明;
  • INFILE 'data.csv' 指定了外部数据文件的路径。你需要将 data.csv 替换为实际的外部数据文件名,并确保文件路径正确;
  • INTO TABLE employees 指定了目标表名为 employees,即将数据加载到 employees 表中。你需要将 employees 替换为实际的目标表名;
  • CHARACTERSET UTF8 指定外部数据文件的字符集编码为 UTF-8
  • FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' 定义了字段的分隔符和可选的字段包裹符。在这个示例中,字段被逗号分隔,并且可能被双引号包裹。根据实际情况,你可能需要调整这些选项;
  • (employee_id, first_name, last_name, email, hire_date DATE 'YYYY-MM-DD') 定义了要加载的字段和它们的数据类型。确保与目标表的列名和数据类型匹配。在这个示例中,hire_date 被格式化为日期,并指定了日期的格式;
  • WHEN (hire_date >= '2022-01-01') 定义了加载数据的条件,只有满足条件的数据,才会被导入 Oracle 数据库;

四、SQL*Loader 的应用场景

SQL*Loader 在实际应用中有广泛的用途,例如:

  • 数据迁移和导入:当需要将外部数据源中的数据导入到 Oracle 数据库中时,SQL*Loader 是一个好的选择。它可以通过灵活的配置,处理大量的数据;
  • 数据集成和同步:在数据集成和同步的场景中,SQL*Loader 可以用于将不同系统或者数据源中的数据整合到同一数据库中,以便进行分析、报告等操作;
  • 日常数据加载:从外部系统中获取数据,并将其加载到 Oracle 数据库中进行进一步处理。SQL*Loader 可以自动化这一过程,提高数据处理的效率;

五、实际应用

以下是一个简单的示例,演示如何使用 SQL*LoaderCSV 文件加载到 Oracle 表中:

  • 创建一个控制文件 data.ctl,定义目标表和字段映射关系:
1
2
3
4
5
6
7
8
9
10
11
sqlCopy codeLOAD DATA
INFILE 'data.csv'
INTO TABLE employees
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
( employee_id,
first_name,
last_name,
email,
hire_date DATE 'YYYY-MM-DD'
)
WHEN (hire_date >= '2022-02-01')
  • 准备外部数据文件 data.csv,包含待加载的数据。
employee_id first_name last_name email hire_date
001 zhangsan@gmail.com 2023-02-01
002 lisi@gmail.com 2023-02-02
  • 在命令行中运行 SQL*Loader
1
2
bashCopy code
sqlldr username/password@database control=data.ctl

这样,SQL*Loader 就会将 data.csv 中的数据加载到名为 employees 的表中。

六、结论

Oracle SQL*Loader 是一个强大的数据加载工具,可用于高效地将外部数据加载到 Oracle 数据库中。通过简单的配置和命令,用户可以轻松地处理大量的数据加载任务,提高数据处理的效率和可靠性。


一、跨库分页查询

对于一定数量级的表,如订单表,通常采用分库分表的方式保存数据。根据指定字段,如果用户ID,散列数据到不同的库中。那么,如果需要按照下单时间分页查询订单信息,就涉及到跨库查询。

假设有45笔订单,存于三个库中,散列算法是 OrderID % 3,则数据分布为:

如果以每页五个订单,查询第二页的所有订单,则单库查询 sql 为:

1
select * from order_info order by id limit 5 offset 5;

但跨库查询就行不通了。下面,主要有三种方案可以用于跨库分页查询:

  • 全局查询法
  • 禁止跳页查询法
  • 二次查询法

二、全局查询法

全局查询法,需要在每个分库中执行查询语句,然后再程序中排序,再定位切割到指定的数据段。

如果需要查询第二页订单,需要查询每个库的前二页数据:

1
2
3
select * from order_info_1 order by id limit 10;
select * from order_info_2 order by id limit 10;
select * from order_info_3 order by id limit 10;

结果为:

将以上三个库的查询结果排序:

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30

那么第二页的订单列表为:

6,7,8,9,10

小结:对于低页码查询,全局查询法是可以应付的,但是,当页码越来越大,查出来的数据也就越来越多,需要排序的数据也越来越多,查询效率也就会越来越慢。

三、禁止跳页查询法

全局查询法的一个显著缺陷,就是随着页码越来越大,查询的数据量也越来越大。那么,如果禁止跳页查询,且每次查询都以上次查询的最大ID为基点,就可以保证每次查询的数据量都是相同的。

查询第一页数据:

将以上三个库的查询结果排序:

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15

那么第一页的订单列表为:

1,2,3,4,5

查询第二页数据:

第一页的订单ID最大值为5,因此第二页的订单ID起始值应大于5,查询得到:

将以上三个库的查询结果排序:

6,7,8,9,10,11,12,13,14,15,16,17,18,19,20

那么第二页的订单列表为:

6,7,8,9,10

小结:禁止跳页查询法,保证每次查询的数据量是相同的,避免了分页查询带来的性能衰减问题;但禁止跳页也是功能缺陷,没法一步定位到指定数据段。

四、二次查询法

二次查找法,既满足跳页查询,也能避免分页查询性能衰减。为了解释这一思想,我们以查询第三页订单数据为例。单库查询语句:

1
select * from order_info order by id limit 5 offset 10;

之所以叫二次查询法,当然需要查询两次。这两次查询有什么不同,希望通过以下四个步骤说清楚:

第一步:语句改写

select * from order_info order by id limit 5 offset 10 改写成 select * from order_info order by id limit 5 offset 3。偏移量10变成3,是基于10/3计算得出的。将语句在三个库分别执行,得到数据:

第二步:找最小值

  • 第一个库:最小数据为8
  • 第二个库:最小数据为11
  • 第三个库:最小数据为12

因此,从三个库中拿到的最小数据为8。

第三步:第二次语句改写

这次需要把 select * from order_info order by id limit 5 offset 3 改写成一个between语句,起点是最小的OrderID,终点是原来每个分库各自返回数据的最大值:

  • 第一个分库改写为: select * from order_info order by id where id between id_min and 22
  • 第二个分库改写为: select * from order_info order by id where id between id_min and 23
  • 第三个分库改写为: select * from order_info order by id where id between id_min and 24

查询结果如下:

第四步:找到id_min的全局偏移量

第一次查询的偏移量为3,那么每一个库的第一个目标数据的偏移量应该都是4。因此可知每个库的id_min的偏移量:

  • 第一个库:8就是id_min,偏移量为4;
  • 第二个库:11的偏移量为4,那么id_min的偏移量就是1;
  • 第三个库:12的偏移量为4,那么id_min的偏移量就是3;

因此id_min的全局偏移量为:4 + 1 + 3 = 8。

第五步:定位目标数据

  • 第一个库:8,13,14,19,22
  • 第二个库:9,10,11,16,17,18,23
  • 第三个库:12,15,20,21,24

经过排序,得到:

8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24

因为id_min的全局偏移量为8,最终结果需要 limit 5 offset 10,因此需要向后推移10 - 8 = 2 位,然后再取5位,得到:

11,12,13,14,15

小结:二次查找法,既避免了数据越处理越多,也支持跳转查询。但其也存在短板,需要查询两次,才能拿到目标数据。


一、零拷贝技术

零拷贝技术,是相对于传统 IO 的一种技术思想。传统 IO :读写数据是在用户空间和内核空间来回复制,而内核空间的数据是通过操作系统层面的 IO 接口从磁盘读取或写入的,中间也需要多次拷贝,因此效率较低。零拷贝技术,目的是尽可能地减少上下文切换和拷贝次数,提升操作性能。

二、传统 IO 实现原理

当应用服务接收客户端的请求时,传统 IO 通常需要两种系统调用:

1
2
3
4
// 读取
read(file, tmp_buf, len);
// 写入
write(socket, tmp_buf, len);

从细分图中可知,虽然只是简单的读写操作,但内部的流程还是比较复杂的。

一次读写将发生 4 次上下文切换:

  • 读取数据:从用户态切换到内核态;
  • 读取完毕:内核完成数据准备,从内核态切换到用户态;
  • 写入数据:从用户态切换到内核态;
  • 写入完毕:内核完成数据写入,从内核态切换到用户态;

一次读写将发生 4 次数据拷贝 (2 次 DMA 拷贝 + 2 次 CPU 拷贝):

  • 第一次拷贝 (DMA):把磁盘文件数据拷贝到内核缓冲区;
  • 第二次拷贝 (CPU):把内核缓冲区的数据拷贝到用户缓冲区,供应用程序使用;
  • 第三次拷贝 (CPU):把用户缓冲区的数据拷贝到内核 socket 缓冲区;
  • 第四次拷贝 (DMA):把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

虽然一次上下文切换需耗时只有几微秒,但在高并发场景中,这种延迟容易被积累和放大,从而影响整体性能。此外,磁盘和网卡操作速度远远小于内存,而内存操作速度又远远小于 CPU,4 次拷贝将严重拖慢系统性能。因此,提高 IO 性能,需要从减少上下文切换次数和数据拷贝次数两方面入手。

三、零拷贝实现

基于以上的讨论,可知零拷贝技术的设计思路:尽可能地减少上下文切换次数和数据拷贝次数。

零拷贝的具体实现方式有:

  • mmap:将内核空间和用户空间的虚拟地址映射到同一物理地址;
  • sendfile:直接把内核缓冲区的数据拷贝到网卡缓冲区;
  • direct IO:在应用层与磁盘、网卡之间建立直接通道;
3.1 mmap 实现零拷贝

在介绍 mmap() 的作用机制之前,先介绍一个新概念:虚拟内存。虚拟内存是现代操作系统中普遍使用的内存结构,使用虚拟地址代替物理内存,有两点好处:一是多个虚拟地址可以指向同一个物理地址;二是虚拟内存空间远远大于物理内存空间。

在传统 IO 中,read() 调用会把内核缓冲区的数据拷贝到用户缓冲区,耗时又耗力。如果把内核空间和用户空间的虚拟地址映射到同一个物理地址,那么就不需要 CPU 来回复制了。

mmap() 正是利用了虚拟内存的这一特性,取代传统 IO 的 read() 操作,并将内核缓冲区和用户缓冲区地址映射到同一物理内存地址,省去一次 CPU 拷贝的过程,提升 IO 性能。具体过程如下:

  • 应用进程调用 mmap() 后,DMA 会把磁盘文件数据拷贝到内核缓冲区;
  • 应用进程跟操作系统内核共享这个缓冲区;

  • 应用进程再调用 write(),直接将内核缓冲区的数据拷贝到内核 socket 缓冲区;

  • DMA 把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

从调用过程可知,与传统 IO 相比,mmap() + write 只减少了 1 次 CPU 拷贝,仍然要发生 4 次上下文切换和 3 次数据拷贝。

3.2 sendfile() 实现零拷贝

senfile() 是 Linux 提供的,专门用于发送文件的系统调用函数。sendfile() 可以替代传统 IO 的 read()、write() 函数,这意味着将省去 2 次上下文切换。此外,数据拷贝路径也有所优化,具体的优化方案与 Linux 内核版本有关,因为在 2.4 版本之后,Linux 提供了 SG-DMA 技术,它将提供比 DMA 技术更进一步的优化策略。

在 2.4 版本之前,CPU 可以直接把内核缓冲区的数据拷贝到内核 socket 缓冲区,省去拷贝到用户缓冲区这一步,因此还存在 2 次上下文切换和 3 次数据拷贝。

具体执行步骤:

  • DMA 把磁盘文件数据拷贝到内核缓冲区;
  • CPU 把内核缓冲区的数据拷贝到内核 socket 缓冲区;
  • DMA 把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

在 2.4 版本之后,引入了 SG_DMA 技术,如果相应的网卡支持该技术,那么就可以把内核缓冲区的数据直接拷贝到网卡缓冲区,也就是说还存在 2 次上下文切换和 2 次数据拷贝。

具体执行步骤:

  • DMA 把磁盘文件数据拷贝到内核缓冲区;
  • 把内核缓冲区描述符和数据长度传到内核 socket 缓冲区;
  • SG-DMA 直接把内核缓冲区的数据拷贝到网卡缓冲区;

3.3 direct IO

直接 IO 是在用户缓冲区和磁盘、网卡之间建立直接通道的技术设计。直接 IO 在读写数据时,可以绕开内核,减少上下文切换和数据拷贝的次数,从而提高效率。

具体执行步骤:

  • DMA 把磁盘文件数据直接拷贝到用户缓冲区;
  • DMA 把用户缓冲区的数据直接拷贝到网卡缓冲区;

直接 IO 使用直接通道操作数据,由应用层完全管理数据,其优缺点也是很明显的。

优点:

  • 应用层与磁盘、网卡建立直接通道,减少了上下文切换和数据拷贝的次数,速度更快;
  • 数据直接缓存在应用层,应用可以更加灵活得操作数据;

缺点:

  • 在应用层引入直接 IO,需要应用层自主管理,给系统增添了额外的复杂度;
  • 若数据不在应用层缓冲区,那么将直接操作磁盘文件读写,将大大拖慢性能;

在 Java 开发中,BigDecimal 是处理高精度数值的必备工具。然而,当我们需要将其转换为字符串时,面对 toString()toPlainString()toEngineeringString() 三个方法,很多开发者往往不知如何选择。本文将深入源码层面,剖析三者的实现原理,揭示常见陷阱,并提供最佳实践指导。

阅读全文 »


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、分片化 Step

如果一个 Step 的任务量比较大,可以尝试将其拆分成多个子任务。子任务之间可并行处理且互不干扰,这将大大提升批处理效率。例如:Master 这个 Step 迁移 100000 条数据需要 100 s,如果将其拆分为 100 个 Slave 任务,那么时间可缩短至 1 s。

Step 分片原理,是一个 Master 处理器对应多个 Salve 处理器。Slave 处理器可以是远程服务,也可以是本地执行线程。主从服务间的消息不需要持久化,也不需要严格保证传递,因为 JobRepository 的元数据管理,是将每个 Salve 独立保存在 batch_step_execution 中的,这样便可以保证每个 Slave 任务只执行一次。

Step 分片化,需要了解两个组件:分片器(Partitioner)和分片处理(PartitionHandler)。

  • 分片器(Partitioner):为每个 Slave 服务配置上下文(StepExecutionContext);

  • 分片处理(PartitionHandler):定义 Slave 服务的数量以及 Slave 任务内容;

比如在一个数据迁移 Step 中,分片处理就是将 1 个主任务拆分成 100 个从任务,并定义从任务的执行内容;分片器就是依次为这 100 个从任务划定数据迁移的范围(select * from table where id between ? and ?)。

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class PartitionTransferStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "masterTransferStudentStep1")
private Step masterTransferStudentStep;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("partitionTransferStudentJob")
.incrementer(new RunIdIncrementer())
.flow(masterTransferStudentStep)
.end()
.build();
}
}
3.2 Step 配置

MasterTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.example.springbatchdemo.component.partitioner.TransferStudentPartitioner;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class MasterTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "transferStudentPartitionHandler1")
private PartitionHandler transferStudentPartitionHandler;

@Autowired
private TransferStudentPartitioner transferStudentPartitioner;

@Bean("masterTransferStudentStep1")
public Step masterTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("masterTransferStudentStep1.manager")
.partitioner("masterTransferStudentStep1", transferStudentPartitioner)
.partitionHandler(transferStudentPartitionHandler)
.build();
}
}

SlaveTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.processor.SlaveStudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class SlaveTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "slaveTransferStudentItemReader")
private JdbcPagingItemReader<Student> slaveTransferStudentItemReader;

@Autowired
@Qualifier(value = "slaveTransferStudentItemWriter")
private JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter;

@Autowired
private SlaveStudentItemProcessor slaveStudentItemProcessor;


@Bean("slaveTransferStudentStep1")
public Step slaveTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("slaveTransferStudentStep1")
.transactionManager(transactionManager)
.<Student, Student>chunk(1000)
.reader(slaveTransferStudentItemReader)
.processor(slaveStudentItemProcessor)
.writer(slaveTransferStudentItemWriter)
.build();
}
}
3.3 Partitioner 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TransferStudentPartitioner implements Partitioner {

private static final Logger LOGGER = LoggerFactory.getLogger(TransferStudentPartitioner.class);

@Override
public Map<String, ExecutionContext> partition(int gridSize) {

Map<String, ExecutionContext> result = new HashMap<>(gridSize);

int range = 1000;
int fromId = 0;
int toId = range;

for (int i = 1; i <= gridSize; i++) {

ExecutionContext value = new ExecutionContext();

value.putInt("fromId", fromId);
value.putInt("toId", toId);

result.put("partition" + i, value);

fromId = toId;
toId += range;

LOGGER.info("partition{}; fromId: {}; toId: {}", i, fromId, toId);
}

return result;
}
}
3.4 Partition-Handler 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Step;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class TransferStudentPartitionHandler {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "slaveTransferStudentStep1")
private Step slaveTransferStudentStep;

@Bean("transferStudentPartitionHandler1")
public PartitionHandler transferStudentPartitionHandler1() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor);
retVal.setStep(slaveTransferStudentStep);
retVal.setGridSize(100);
return retVal;
}
}
3.5 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemReader")
@StepScope
public JdbcPagingItemReader<Student> slaveTransferStudentItemReader(@Value("#{stepExecutionContext[fromId]}") final Long fromId,
@Value("#{stepExecutionContext[toId]}") final Long toId) {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");
queryProvider.setWhereClause(String.format("where student_id > %s and student_id <= %s", fromId, toId));

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.6 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
@StepScope
public class SlaveStudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.7 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemWriter")
@StepScope
public JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 常规 Step

省略测试代码,具体请查看 demo。测试结果:

耗时:13s

4.2 分片化 Step

测试结果:

batch_step_execution 可以看出,共有 100 个子任务并行处理,每个子任务迁移 1000 条数据。

耗时:7s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、并行化 Step

一个 Job 可配置多个 StepStep 之间可能存在关联,需要有先有后;也可能没有关联,先执行哪一个都可以。那么,若将这些互不关联的 Step 进行并行化处理,将会有效提升批处理性能。

比如,现有一个批处理任务,包含 4 个 Step

  • step1:在学生姓名后面追加字符串 “1”;
  • step2:在学生姓名后面追加字符串 “2”;
  • step3:在学生住址后面追加字符串 “8”;
  • step4:迁移所有学生信息;

我们发现,修改学生姓名的任务与修改学生住址的任务,互不干扰,并不需要有先后之分。因此,我们可以将 step1step2step3 并行执行。串行 Step 与并行 Step 流程如下:

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchManageStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentSplitFlow1")
private Flow batchProcessStudentSplitFlow;

@Autowired
@Qualifier(value = "batchTransferStudentStep1")
private Step batchTransferStudentStep;

@Bean
public Job manageStudentJob() {
return jobBuilderFactory.get("manageStudentJob1")
.incrementer(new RunIdIncrementer())
// 姓名追加1、姓名追加2、地址追加8
.start(batchProcessStudentSplitFlow)
// 迁移学生信息; student_source -> student_target
.next(batchTransferStudentStep)
.end()
.build();
}
}
3.2 Fow 配置

batchProcessStudentSplitFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentSplitFlow {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "batchUpdateStudentNameOneAndTwoFlow")
private Flow batchUpdateStudentNameOneAndTwoFlow;

@Autowired
@Qualifier(value = "batchUpdateStudentAddressFlow1")
private Flow batchUpdateStudentAddressFlow;

@Bean("batchProcessStudentSplitFlow1")
public Flow batchProcessStudentSplitFlow1() {
return new FlowBuilder<SimpleFlow>("batchProcessStudentSplitFlow1")
.split(taskExecutor)
.add(batchUpdateStudentNameOneAndTwoFlow, batchUpdateStudentAddressFlow)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep1")
private Step batchUpdateStudentNameStep1;

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep2")
private Step batchUpdateStudentNameStep2;

@Bean("batchUpdateStudentNameOneAndTwoFlow")
public Flow updateStudentNameOneAndTwoFlow() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentNameOneAndTwoFlow")
.start(batchUpdateStudentNameStep1)
.next(batchUpdateStudentNameStep2)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentAddressStep1")
private Step batchUpdateStudentAddressStep;

@Bean("batchUpdateStudentAddressFlow1")
public Flow batchUpdateStudentAddressFlow1() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentAddressFlow1")
.start(batchUpdateStudentAddressStep)
.build();
}
}
3.3 Step 配置

BatchUpdateStudentNameStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.example.springbatchdemo.component.processor.AppendStudentNameOneProcessor;
import com.example.springbatchdemo.component.processor.AppendStudentNameTwoProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateName")
private JdbcBatchItemWriter<Student> studentItemUpdateName;

@Autowired
private AppendStudentNameOneProcessor appendStudentNameOneProcessor;

@Autowired
private AppendStudentNameTwoProcessor appendStudentNameTwoProcessor;

@Bean("batchUpdateStudentNameStep1")
public Step batchUpdateStudentNameStep1() {
return stepBuilderFactory.get("batchUpdateStudentNameStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 1
.processor(appendStudentNameOneProcessor)
.writer(studentItemUpdateName)
.build();
}

@Bean("batchUpdateStudentNameStep2")
public Step batchUpdateStudentNameStep2() {
return stepBuilderFactory.get("batchUpdateStudentNameStep2")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 2
.processor(appendStudentNameTwoProcessor)
.writer(studentItemUpdateName)
.build();
}
}

BatchUpdateStudentAddressStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.AppendStudentAddressProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateAddress")
private JdbcBatchItemWriter<Student> studentItemUpdateAddress;

@Autowired
private AppendStudentAddressProcessor appendStudentAddressProcessor;

@Bean("batchUpdateStudentAddressStep1")
public Step batchUpdateStudentAddressStep1() {
return stepBuilderFactory.get("batchUpdateStudentAddressStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 住址追加 8
.processor(appendStudentAddressProcessor)
.writer(studentItemUpdateAddress)
.build();
}
}

BatchProcessStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchTransferStudentStep1")
public Step batchTransferStudentStep1() {
return stepBuilderFactory.get("batchTransferStudentStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 迁移数据; student_source -> student_target
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.4 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
@StepScope
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.5 数据处理器

AppendStudentNameOneProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameOneProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameOneProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_1"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentNameTwoProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameTwoProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameTwoProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_2"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentAddressProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentAddressProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentAddressProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address.concat("_8"));

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

StudentItemProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.6 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateName")
@StepScope
public JdbcBatchItemWriter<Student> studentItemUpdateName() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET name = :name WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateAddress")
public JdbcBatchItemWriter<Student> studentItemUpdateAddress() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET address = :address WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}
}

@StepScope:

从上面的 Step 配置可知,studentItemReader 被多个 Step 引用。默认情况下 studentItemReader 的生命周期是与 Job 保持一致,那么在多 Step 引用的情况下,就会抛出类似下面这种异常:

1
>Caused by: java.lang.IllegalStateException: Cannot open an already opened ItemReader, call close first

使用注解 StepScope,让 studentItemReader 的生命周期与 Step 保持同步,保证每个 Step 拿到的 ItemReader 都是新的实例。同样,ItemWriterItemProcessor 存在多 Step 引用的,都要使用该注解。

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 串行 Step

串行 Step 批处理,只需要按照顺序配置 Step(省略代码示例)。测试结果:

耗时:91s

4.2 并行 Step

测试结果:

耗时:68s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、多线程 Step 配置

Spring Batch 执行一个 Step,会按照 chunk 配置的数量分批次提交。对于多线程 Step,由线程池去处理任务批次。因此,每个 chunk 都不用串行等待,这大大地提高了批处理性能。

配置多线程 Step 非常简单,可以通过 xml 或接口来配置。以接口配置为例:

1
2
3
4
5
6
7
8
9
10
11
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
// 节流配置, 不要超过线程池的最大线程数量
.throttleLimit(20)
.build();
}

此外,在配置多线程 Step 时,我们需要考虑得更多:

  • 线程池:推荐使用 Spring 线程池 ThreadPoolTaskExecutor,兼容性好;
  • 线程安全:输入器和输出器必须是线程安全的,否则可能会导致重复任务、脏数据等问题;
  • 框架节流:Spring Batch 自带节流器,默认最多可处理 4 个小任务,因此需要重新配置;

三、批处理配置

通过 Spring Batch 应用,迁移 100 万条数据。相关配置如下:

3.1 数据读取器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.2 数据映射器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import com.example.springbatchdemo.entity.Student;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;

public class StudentRowMapper implements RowMapper<Student> {

@Override
public Student mapRow(ResultSet rs, int rowNum) throws SQLException {
Student student = new Student();
student.setStudentId(rs.getLong("student_id"));
student.setName(rs.getString("name"));
student.setAddress(rs.getString("address"));
return student;
}
}
3.3 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.4 数据写入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}
3.5 Step 配置-单线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.6 Step 配置-多线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.taskExecutor(taskExecutor)
.throttleLimit(30)
.build();
}
}
3.7 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.example.springbatchdemo.component.listener.BatchProcessStudentCompletionListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchProcessStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentStep1")
private Step batchProcessStudentStep1;

@Autowired
private BatchProcessStudentCompletionListener batchProcessStudentCompletionListener;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("transferStudentJob")
.incrementer(new RunIdIncrementer())
.listener(batchProcessStudentCompletionListener)
.flow(batchProcessStudentStep1)
.end()
.build();
}
}
3.8 MySQL 数据源配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

@Primary
@Bean(name = "batchDemoDB")
// 数据源配置参数识别前缀, 根据具体配置来设定
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
// 使用 SpringBoot 默认的数据源 HikariDataSource
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
}
3.9 线程池配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class ExecutorConfig {

public static final String TASK_EXECUTOR = "taskExecutor";

@Bean(TASK_EXECUTOR)
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("common-async-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}

四、批处理性能测试

4.1 单线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:313 秒

4.2 多线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:81 秒


性能提升超300%

五、总结

多线程 Stepchunk 任务交给线程池异步执行,可以显著地提升批处理的性能。但在多线程场景下,我们要了解 Spring Batch 的基础架构,避免并发导致的重复任务、脏数据等问题。

示例代码:spring-batch-demo


一、Spring Batch 数据输出器

Spring Batch 的数据输出器,是通过接口 ItemWriter 来实现的。针对常用的数据输出场景,Spring Batch 提供了丰富的组件支持(查看所有组件),本文介绍最常用的五个组件:

  • FlatFileItemWriter:输出文本数据;
  • JdbcBatchItemWriter:输出数据到数据库;
  • StaxEventItemWriter:输出 XML 文件数据;
  • JsonFileItemWriter:输出 JSON 文件数据;
  • ClassifierCompositeItemWriter:输出多文本数据;

二、简单使用

实体类 Ticket.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}

文件 ticket.csv

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00
2.1 FlatFileItemWriter-文本数据输出

ticket.csv 中的信息,转换为 JSON 字符串,输出到文件 ticket_output.txt 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Job
*/
@Bean
public Job testFlatFileItemWriterJob() {
return jobBuilderFactory.get("testFlatFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testFlatFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testFlatFileItemWriterStep")
public Step testFlatFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testFlatFileItemWriterStep")
.transactionManager(transactionManager)
.reader(ticketFileItemReader)
.writer(ticketFileItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

输出文本数据,要求目标数据的格式为字符串,因此需要将 POJO 按照一定规则聚合成字符串。Spring Batch 已实现聚合器 LineAggregatorPassThroughLineAggregator(打印 POJO)、RecursiveCollectionLineAggregator(打印 POJO 列表)、DelimitedLineAggregator(分隔符拼接 POJO 字段值)、FormatterLineAggregator(格式化 POJO 字段值)。当然,我们也可以手动实现聚合器,例如示例代码中,将 POJO 转换为 JSON 格式。

启动应用,生成文件 ticket_output.txt

1
2
3
4
5
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00}
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00}
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
2.2 JdbcBatchItemWriter-数据库数据输出

将文件 student.cvs 中的信息(内容如下),导入到 MySQL 数据表 student 中:

1
2
3
1,张三,合肥
2,李四,蚌埠
3,王二,南京
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Job
*/
@Bean
public Job testDatabaseItemWriterJob() {
return jobBuilderFactory.get("testDatabaseItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testDatabaseItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testDatabaseItemWriterStep")
public Step testDatabaseItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testDatabaseItemWriterStep")
.transactionManager(transactionManager)
.<Student, Student>chunk(10)
.reader(studentFileItemReader)
.writer(studentItemWriter)
.build();
}

/**
* Reader
*/
@Bean("studentFileItemReader")
public FlatFileItemReader<Student> studentFileItemReader() {
return new FlatFileItemReaderBuilder<Student>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("student.csv"))
.delimited()
.names(new String[]{"studentId", "name", "address"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {{
setTargetType(Student.class);
}})
.build();
}

/**
* Writer
*/
@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {
return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

/**
* MySQL 数据源配置
*/
@Primary
@Bean(name = "batchDemoDB")
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

启动应用,文件中的数据已导入表 student

2.3 StaxEventItemWriter-XML 文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.xml 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Job
*/
@Bean
public Job testXmlItemWriterJob() {
return jobBuilderFactory.get("testXmlItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testXmlItemWriterStep)
.build();
}

/**
* Step
*/
@Bean("testXmlItemWriterStep")
public Step testXmlItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testXmlItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketXmlItemWriter)
.build();
}


/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件 ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>蚌埠</arrivalStation><price>220.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>杭州</arrivalStation><price>75.20</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>昆山</arrivalStation><price>19.00</price></ticket></tickets>
2.4 JsonFileItemWriter-JSON文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.json 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Job
*/
@Bean
public Job testJsonItemWriterJob() {
return jobBuilderFactory.get("testJsonItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testJsonItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testJsonItemWriterStep")
public Step testJsonItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testJsonItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketJsonItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketJsonItemWriter")
public JsonFileItemWriter<Ticket> ticketJsonItemWriter() {
return new JsonFileItemWriterBuilder<Ticket>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new FileSystemResource("ticket_output.json"))
.name("ticketJsonItemWriter")
.build();
}

启动应用,生成文件 ticket_output.json

1
2
3
4
5
6
7
[
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00},
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00},
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00},
{"departureStation":"上海","arrivalStation":"杭州","price":75.20},
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
]
2.5 ClassifierCompositeItemWriter-输出多文本数据

将文件 ticket.csv 中始发站为上海的车票信息输出到文本中,其余的输出到 XML 文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* Job
*/
@Bean
public Job testMultiFileItemWriterJob() {
return jobBuilderFactory.get("testMultiFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testMultiFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testMultiFileItemWriterStep")
public Step testMultiFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testMultiFileItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketClassifierMultiFileItemWriter)
.stream(ticketFileItemWriter)
.stream(ticketXmlItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Classifier Writer
*/
@Bean("ticketClassifierMultiFileItemWriter")
public ClassifierCompositeItemWriter<Ticket> ticketClassifierMultiFileItemWriter() {
ClassifierCompositeItemWriter<Ticket> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier((Classifier<Ticket, ItemWriter<? super Ticket>>) ticket -> {
// 始发站是上海的, 输出到文本中, 否则输出到 XML 文件中
return "上海".equals(ticket.getDepartureStation()) ? ticketFileItemWriter() : ticketXmlItemWriter();
});
return writer;
}

/**
* 文本-Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

/**
* XML-Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件如下:

ticket_output.txt

1
2
3
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}

ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket></tickets>

示例代码:spring-batch-demo


一、Spring Batch 数据读取器

Spring Batch 的数据读取器,是通过接口 ItemReader 来实现的。针对常用的数据读取场景,Spring Batch 提供了丰富的组件支持(查看所有组件),本文介绍最常用的五个组件:

  • FlatFileItemReader:读取文本数据;
  • JdbcPagingItemReader:分页读取数据库的数据;
  • StaxEventItemReader:读取 XML 文件数据;
  • JsonItemReader:读取 JSON 文件数据;
  • MultiResourceItemReader:读取多文本数据;

二、简单使用

实体类 Ticket.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}
2.1 FlatFileItemReader-文本数据读取

文件 ticket.csv

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00

可以看到,文本数据的每一行代表一个 Ticket 实体,对象属性之间以英文逗号分隔。通过 FlatFileItemReader,可以按照行将文本数据转换为 POJO 存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Job
*/
@Bean
public Job testFlatItemFileReaderJob() {
return jobBuilderFactory.get("testFlatItemFileReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testFlatFileItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testFlatFileItemReaderStep")
public Step testFlatFileItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testFlatFileItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 13:50:23.538  INFO 77808 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testFlatItemFileReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 13:50:23.599 INFO 77808 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testFlatFileItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 13:50:23.680 INFO 77808 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testFlatFileItemReaderStep] executed in 79ms
2.2 JdbcPagingItemReader-数据库数据读取

MySQL 数据库,分页读取表 student 的数据,并打印数据内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Job
*/
@Bean
public Job testDatabaseItemReaderJob() {
return jobBuilderFactory.get("testDatabaseItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testDatabaseItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testDatabaseItemReaderStep")
public Step testDatabaseItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testDatabaseItemReaderStep")
.transactionManager(transactionManager)
.<Student, Student>chunk(10)
.reader(studentItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("studentItemReader")
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}

public class StudentRowMapper implements RowMapper<Student> {

/**
* Student 字段映射
*/
@Override
public Student mapRow(ResultSet rs, int rowNum) throws SQLException {

Student student = new Student();
student.setStudentId(rs.getLong("student_id"));
student.setName(rs.getString("name"));
student.setAddress(rs.getString("address"));
return student;
}
}

/**
* MySQL 数据源配置
*/
@Primary
@Bean(name = "batchDemoDB")
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
2022-06-02 14:00:19.010  INFO 67748 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testDatabaseItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:00:19.107 INFO 67748 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testDatabaseItemReaderStep]
name: 张三1, address: 上海市1
name: 张三2, address: 上海市2
name: 张三3, address: 上海市3
name: 张三4, address: 上海市4
name: 张三5, address: 上海市5
name: 张三6, address: 上海市6
name: 张三7, address: 上海市7
name: 张三8, address: 上海市8
name: 张三9, address: 上海市9
name: 张三10, address: 上海市10
2022-06-02 14:00:19.284 INFO 67748 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testDatabaseItemReaderStep] executed in 176ms
2.3 StaxEventItemReader-XML 数据读取

文件 ticket.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8"?>
<tickets>
<ticket>
<departureStation>合肥</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>60.00</price>
</ticket>
<ticket>
<departureStation>南京</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>70.00</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>220.00</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>杭州</arrivalStation>
<price>75.20</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>昆山</arrivalStation>
<price>19.00</price>
</ticket>
</tickets>

可以看到,文件内容是多组 ticket 标签组成的,每一个标签代表一个 Ticket 实体;每个 ticket 标签,内含 3 个子标签,代表 Ticket 实体的 3 个属性值。

涉及到 XMLObject 的映射,因此需要引入 OXM 技术。推荐使用 spring oxm,pom 依赖:

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.11.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Job
*/
@Bean
public Job testXmlItemReaderJob() {
return jobBuilderFactory.get("testXmlItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testXmlItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testXmlItemReaderStep")
public Step testXmlItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testXmlItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketXmlItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketXmlItemReader")
public StaxEventItemReader<Ticket> itemReader() {
return new StaxEventItemReaderBuilder<Ticket>()
.name("ticketXmlItemReader")
.resource(new ClassPathResource("ticket.xml"))
.addFragmentRootElements("ticket")
.unmarshaller(ticketMarshaller)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:15:48.444  INFO 87024 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testXmlItemReaderJob]] launched with the following parameters: [{run.id=3}]
2022-06-02 14:15:48.503 INFO 87024 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testXmlItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:15:48.710 INFO 87024 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testXmlItemReaderStep] executed in 205ms
2.4 JsonItemReader-JSON 数据读取

文件 ticket.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[
{
"departureStation": "合肥",
"arrivalStation": "蚌埠",
"price": "60.00"
},
{
"departureStation": "南京",
"arrivalStation": "蚌埠",
"price": "70.00"
},
{
"departureStation": "上海",
"arrivalStation": "蚌埠",
"price": "220.00"
},
{
"departureStation": "上海",
"arrivalStation": "杭州",
"price": "75.20"
},
{
"departureStation": "上海",
"arrivalStation": "昆山",
"price": "19.00"
}
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Job
*/
@Bean
public Job testJsonItemReaderJob() {
return jobBuilderFactory.get("testJsonItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testJsonItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testJsonItemReaderStep")
public Step testJsonItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testJsonItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketJsonItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketJsonItemReader")
public JsonItemReader<Ticket> ticketJsonItemReader() {
return new JsonItemReaderBuilder<Ticket>()
.name("ticketJsonItemReader")
.jsonObjectReader(new JacksonJsonObjectReader<>(Ticket.class))
.resource(new ClassPathResource("ticket.json"))
.build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:25:38.142  INFO 76544 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testJsonItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:25:38.211 INFO 76544 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testJsonItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:25:38.328 INFO 76544 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testJsonItemReaderStep] executed in 116ms
2.5 MultiResourceItemReader-多文本数据读取

多文本数据读取,与文本数据读取的原理一致,只是在其基础上,做了一层代理。多文本数据读取,要求每个文本的数据结构相同,如从 ticket-1.cvsticket-2.cvs 两个文件中读取数据:

1
2
3
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
1
2
上海,杭州,75.20
上海,昆山,19.00
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Job
*/
@Bean
public Job testMultiFileItemReaderJob() {
return jobBuilderFactory.get("testMultiFileItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testMultiFileItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testMultiFileItemReaderStep")
public Step testMultiFileItemReaderStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testMultiFileItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketMultiFileItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Proxy Reader
*/
@Bean("ticketMultiFileItemReader")
public MultiResourceItemReader<Ticket> ticketMultiFileItemReader() {

// 资源文件
Resource[] resources = new Resource[]{
new ClassPathResource("ticket-1.csv"),
new ClassPathResource("ticket-2.csv")};

return new MultiResourceItemReaderBuilder<Ticket>()
.name("ticketMultiFileItemReader")
.delegate(commonTicketFileItemReader())
.resources(resources)
.build();
}

/**
* Reader
*/
@Bean("commonTicketFileItemReader")
public FlatFileItemReader<Ticket> commonTicketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("commonTicketFileItemReader")
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

启动程序,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:37:49.693  INFO 86124 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testMultiFileItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:37:49.785 INFO 86124 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testMultiFileItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:37:49.944 INFO 86124 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testMultiFileItemReaderStep] executed in 157ms

示例代码:spring-batch-demo