0%

一、背景

做微服务架构的人,基本绕不开 API 网关这个东西。不管你用 Spring Cloud Gateway、Envoy 还是 Nginx 手搓,总得有个统一的入口把请求转发给后端服务。

Kong 是这个领域里做得最成熟的开源方案之一,GitHub 上 40K+ Stars,背后有商业公司维护,插件生态完善,大厂用得也多。但很多开发者对 Kong 的理解停留在”哦,就是个网关”这个层面,对其架构设计、插件系统、配置模式这些核心东西没深入过。


二、Kong 是什么

一句话定义:Kong 是一个基于 Nginx + OpenResty(LuaJIT) 的开源 API 网关,核心能力是路由转发、认证鉴权、流量控制、可观测性

打个比方理解它的定位:

场景 没有网关 有了 Kong
10 个微服务都要做认证 每个服务自己实现一遍 Kong 统一做,服务只管业务
要对某个接口限流 在代码里写限流逻辑 Kong 配个插件就搞定
要记录所有请求日志 每个服务加日志代码 Kong 统一采集,不影响业务
要做灰度发布 改代码或改 Nginx 配置 Kong 按权重分流,配置即生效

核心区别在于:传统做法每个服务都要重复造轮子,Kong 把这些横切关注点(Cross-Cutting Concerns) 统一收口到网关层。


三、架构深度拆解

3.1 整体架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
                    客户端请求


┌─────────────────────────────────────────────────────────────┐
│ Kong Gateway │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Nginx (反向代理层) │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ OpenResty / LuaJIT │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Kong Core │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Plugin System (插件系统) │ │ │
│ │ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │
│ │ │ │ 认证 │ │ 限流 │ │ 日志 │ │ 转换 │ │ │ │
│ │ │ └────────┘ └────────┘ └────────┘ └────────┘ │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ ├─────────────────────────────────────────────────────┤ │
│ │ Admin API (管理接口) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────┐
│ 数据存储层 │
│ PostgreSQL / │
│ Cassandra / 声明式 │
└─────────────────────┘


┌─────────────────────┐
│ 上游服务集群 │
│ (User Service, │
│ Order Service...) │
└─────────────────────┘

这个架构有几个关键点值得拆开说。

3.2 为什么是 Nginx + OpenResty

Kong 没有用 Java、没用 Go,选了 Lua 这个相对小众的语言,原因在于 OpenResty

OpenResty 是 Nginx 的一个增强版,把 LuaJIT 嵌入到 Nginx 的请求处理生命周期里。这意味着:

  1. 性能极高:Nginx 本身就以高并发著称,LuaJIT 的执行速度接近 C
  2. 非阻塞 I/O:OpenResty 把所有网络操作都封装成非阻塞的,一个请求在等数据库返回的时候不会阻塞其他请求
  3. 生命周期钩子:Nginx 的各个处理阶段(rewrite、access、content、log)都能用 Lua 来扩展

Kong 本质上就是一个用 Lua 写的、运行在 OpenResty 上的 Nginx 配置生成器 + 插件执行引擎

3.3 核心抽象模型

Kong 的配置管理围绕几个核心概念展开:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Consumer (消费者)

├── 某个调用方的身份(API Key、JWT 等标识)


Route (路由)

├── 匹配规则:路径、Host、HTTP Method
├── 一个 Service 可以挂多个 Route


Service (服务)

├── 一个上游服务的抽象(URL、协议、超时配置)
├── 指向 Upstream


Upstream (上游)

├── 一组 Target 的集合
├── 负载均衡策略、健康检查


Target (目标)

└── 具体的后端实例地址(IP:Port)

举个实际例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 定义一个服务
Service:
name: user-service
url: http://user-api.internal:8080

# 给这个服务定义路由
Route:
name: user-route
paths: ["/api/v1/users"]
methods: ["GET", "POST"]
service: user-service

# 给路由加认证插件
Plugin:
name: key-auth
route: user-route

这个模型的设计很清晰:Service 是对后端服务的抽象,Route 是对请求匹配规则的抽象,Plugin 是对横切逻辑的抽象

3.4 请求生命周期

一个请求到达 Kong 之后,会经历这些阶段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1. 客户端发送请求


2. SSL/TLS 握手(certificate 阶段)
│ ← 这里可以执行 mTLS 插件

3. URI 重写(rewrite 阶段)
│ ← 这里可以执行 URL 重写插件

4. 路由匹配
│ ← Kong 根据 Host/Path/Method 找到对应的 Route

5. 认证 & 鉴权(access 阶段)
│ ← 执行 key-auth、jwt、oauth2 等插件

6. 限流 & 配额(access 阶段)
│ ← 执行 rate-limiting、quota 等插件

7. 请求转换(access 阶段)
│ ← 执行 request-transformer 插件

8. 转发请求到上游服务


9. 接收上游响应


10. 响应头处理(header_filter 阶段)
│ ← 执行 cors、response-transformer 等插件

11. 响应体处理(body_filter 阶段)
│ ← 可以修改响应内容

12. 日志记录(log 阶段)
│ ← 执行 tcp-log、http-log、file-log 等插件

13. 返回响应给客户端

注意 access 阶段是核心,大部分插件都在这个阶段执行。每个插件有优先级(priority),priority 越高越先执行。


四、插件系统:Kong 的核心竞争力

插件系统是 Kong 区别于其他网关的最大优势。理解插件系统,才算真正理解 Kong。

4.1 插件的作用域

Kong 的插件不是全局一把梭,而是可以分层配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
全局插件(Global Plugin)

├── 对所有请求生效


服务级插件(Service-level Plugin)

├── 对某个 Service 的所有请求生效


路由级插件(Route-level Plugin)

├── 对匹配某个 Route 的请求生效


消费者级插件(Consumer-level Plugin)

└── 对某个 Consumer 的请求生效

这种分层设计很实用。比如:

  • 全局开启 CORS 插件
  • 对支付服务单独开启更严格的限流
  • 对 VIP 消费者放宽配额限制

4.2 插件执行机制

每个插件本质上是一个 Lua 模块,实现了特定的钩子函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 一个最简单的 Kong 插件示例
local MyPlugin = {
PRIORITY = 1000, -- 优先级,越大越先执行
VERSION = "1.0.0",
}

-- access 阶段执行的逻辑
function MyPlugin:access(conf)
-- 从请求中获取某个 header
local request_id = kong.request.get_header("X-Request-ID")

if not request_id then
-- 如果没有 request_id,直接返回 400
return kong.response.exit(400, {
message = "Missing X-Request-ID header"
})
end

-- 把 request_id 传给上游服务
kong.service.request.set_header("X-Request-ID", request_id)
end

return MyPlugin

Kong 提供了 PDK(Plugin Development Kit),这是一组 Lua 函数,封装了所有常用操作:

PDK 函数 用途
kong.request.get_header() 获取请求头
kong.request.get_body() 获取请求体
kong.response.exit() 直接返回响应
kong.service.request.set_header() 修改转发给上游的请求头
kong.client.get_consumer() 获取当前消费者信息
kong.ip.get_source() 获取客户端真实 IP

4.3 内置插件分类

Kong 的内置插件可以分成这几大类:

认证类

插件 说明
key-auth API Key 认证,最简单常用
jwt JWT Token 认证
oauth2 完整的 OAuth 2.0 流程
basic-auth HTTP Basic 认证
hmac-auth HMAC 签名认证
ldap-auth LDAP 目录认证
openid-connect OIDC 认证(对接 Okta、Auth0 等)
mtls-auth 双向 TLS 认证

流量控制类

插件 说明
rate-limiting 基于 IP/Consumer/Service 的速率限制
request-size-limiting 限制请求体大小
proxy-cache 响应缓存
canary 金丝雀发布,按权重分流

转换类

插件 说明
request-transformer 修改请求头、Body、URL 参数
response-transformer 修改响应头、Body
request-validator JSON Schema 请求验证
correlation-id 生成请求唯一标识
cors 跨域配置

可观测性类

插件 说明
prometheus 暴露 Prometheus 指标
datadog 集成 Datadog APM
zipkin 分布式链路追踪
tcp-log TCP 方式发送日志
http-log HTTP 方式发送日志
file-log 写入本地文件
kafka-log 发送到 Kafka

4.4 插件开发实战

官方内置插件覆盖了大部分场景,但有时候你得写自己的插件。比如:给所有经过网关的请求加上公司内部的审计日志。

目录结构

1
2
3
4
5
6
7
kong-plugin-audit-log/
├── kong/
│ └── plugins/
│ └── audit-log/
│ ├── handler.lua # 插件逻辑
│ └── schema.lua # 配置校验
└── kong-plugin-audit-log-0.1.0-1.rockspec

handler.lua - 插件主逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
local AuditLog = {
PRIORITY = 10,
VERSION = "1.0.0",
}

function AuditLog:log(conf)
-- 获取请求信息
local request = {
method = kong.request.get_method(),
path = kong.request.get_path(),
query = kong.request.get_raw_query(),
headers = kong.request.get_headers(),
client_ip = kong.client.get_ip(),
consumer = kong.client.get_consumer(),
service = kong.router.get_service(),
response_status = kong.response.get_status(),
request_id = kong.request.get_header("X-Request-ID"),
}

-- 序列化后发送到审计系统
local cjson = require("cjson")
local log_data = cjson.encode(request)

-- 发送到 Kafka 或 HTTP 端点
local http = require("resty.http")
local httpc = http.new()
httpc:request_uri(conf.audit_endpoint, {
method = "POST",
body = log_data,
headers = {
["Content-Type"] = "application/json",
},
})
end

return AuditLog

schema.lua - 配置校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
return {
name = "audit-log",
fields = {
{ consumer = typedefs.no_consumer },
{ config = {
type = "record",
fields = {
{ audit_endpoint = { type = "string", required = true } },
{ include_headers = { type = "boolean", default = true } },
},
},
},
},
}

安装插件

1
2
3
4
5
6
7
8
# 打包
luarocks make

# 在 kong.conf 中启用
plugins = bundled,audit-log

# 或者通过环境变量
KONG_PLUGINS=bundled,audit-log

4.5 多语言插件支持

Lua 不是所有人都熟,Kong 后来加了 External Plugins 机制,支持用其他语言写插件:

语言 方式 说明
Go Go PDK 通过 gRPC 和 Kong 通信
Python Python PDK 同样走 gRPC
JavaScript JS PDK 同样走 gRPC
WebAssembly WASM 新一代扩展方式

原理是一样的:Kong 在插件执行阶段通过 gRPC 调用外部插件服务,外部服务处理完返回结果。性能比原生 Lua 插件差一点,但开发门槛低很多。


五、配置管理:三种模式

Kong 支持三种配置管理模式,适应不同的部署场景。

5.1 传统数据库模式(DB Mode)

1
2
3
Kong 节点 ──→ PostgreSQL / Cassandra

└── Admin API 操作配置

这是最经典的模式。所有配置(Service、Route、Plugin、Consumer)都存在数据库里,通过 Admin API 管理。

优点

  • 配置变更实时生效,不需要重启
  • 多个 Kong 节点共享同一份配置
  • 适合动态环境,服务频繁上下线

缺点

  • 依赖数据库,多了一个运维组件
  • 数据库挂了 Kong 就废了(虽然有缓存,但配置变更就没了)

5.2 DB-less 声明式模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# kong.yml
_format_version: "3.0"

services:
- name: user-service
url: http://user-api:8080
routes:
- name: user-route
paths: ["/api/users"]
plugins:
- name: key-auth
- name: rate-limiting
config:
minute: 100
policy: local

- name: order-service
url: http://order-api:8080
routes:
- name: order-route
paths: ["/api/orders"]

启动时加载配置文件:

1
kong start -c kong.conf --declarative-config kong.yml

优点

  • 不需要数据库,部署更简单
  • 配置即代码,可以 Git 管理
  • 适合 GitOps、CI/CD 流程
  • 适合 Kubernetes ConfigMap/Secret

缺点

  • 配置变更需要重新加载(不是重启,是 reload)
  • 不支持 Consumer 的动态注册

5.3 混合模式(Hybrid Mode)

这是 Kong 比较新的部署模式,把控制平面和数据平面分离:

1
2
3
4
5
6
7
8
9
10
11
12
13
┌─────────────────────┐         ┌─────────────────────┐
│ Control Plane │ │ Data Plane │
│ (控制平面) │ │ (数据平面) │
│ │ │ │
│ ┌───────────────┐ │ mTLS │ ┌───────────────┐ │
│ │ Admin API │ │◄────────►│ │ Proxy Only │ │
│ │ 配置管理 │ │ 同步 │ │ 只做代理 │ │
│ └───────────────┘ │ 配置 │ └───────────────┘ │
│ │ │ │
│ ┌───────────────┐ │ │ 无 Admin API │
│ │ PostgreSQL │ │ │ 无数据库 │
│ └───────────────┘ │ │ │
└─────────────────────┘ └─────────────────────┘

工作原理

  1. 控制平面负责配置管理,连接数据库
  2. 数据平面只做代理,不连数据库
  3. 控制平面通过 mTLS 把配置推送到数据平面
  4. 数据平面本地缓存配置,即使控制平面挂了也能继续工作

优点

  • 数据平面不暴露 Admin API,安全性更高
  • 数据平面不需要数据库,部署更轻量
  • 可以跨机房部署,控制平面集中管理
  • 适合大规模微服务架构

缺点

  • 架构复杂度增加
  • 需要管理 mTLS 证书

六、负载均衡与服务发现

6.1 负载均衡算法

Kong 支持多种负载均衡算法:

算法 说明 适用场景
round-robin 轮询 通用场景,最常用
consistent-hashing 一致性哈希 需要会话保持的场景
least-connections 最少连接数 后端实例性能不均
latency 最低延迟 对延迟敏感的场景

一致性哈希特别值得说一下。它可以基于不同的维度做哈希:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 基于 Consumer 做哈希,同一个消费者的请求总是打到同一个后端
curl -X POST http://localhost:8001/upstreams/my-upstream \
--data name=my-upstream \
--data hash_on=consumer

# 基于 IP 做哈希
curl -X POST http://localhost:8001/upstreams/my-upstream \
--data name=my-upstream \
--data hash_on=ip

# 基于某个 Header 做哈希
curl -X POST http://localhost:8001/upstreams/my-upstream \
--data name=my-upstream \
--data hash_on=header \
--data hash_on_header=X-User-ID

6.2 健康检查

Kong 支持两种健康检查机制:

主动检查(Active Health Checks)

Kong 定期向后端实例发送探测请求:

1
2
3
4
5
curl -X POST http://localhost:8001/upstreams/my-upstream/healthcheck \
--data active.healthy.interval=5 \
--data active.unhealthy.interval=2 \
--data active.http_path=/health \
--data active.http_statuses=200,302

被动检查(Passive Health Checks)

基于实际请求的失败情况来判断后端是否健康(也叫熔断):

1
2
3
4
5
curl -X POST http://localhost:8001/upstreams/my-upstream/healthcheck \
--data passive.healthy.successes=5 \
--data passive.unhealthy.tcp_failures=3 \
--data passive.unhealthy.timeouts=3 \
--data passive.unhealthy.http_failures=5

两种方式通常配合使用:主动检查发现故障,被动检查在故障发生时快速熔断。

6.3 服务发现

Kong 支持与服务发现系统集成:

方式 说明
DNS SRV 通过 DNS 记录发现服务
Consul 集成 HashiCorp Consul
Kubernetes 直接使用 K8s Service 名称

Kubernetes 集成最常用。在 K8s 里,Kong 直接用 Service 名称做 upstream:

1
2
3
services:
- name: user-service
url: http://user-service.default.svc.cluster.local:8080

Kong 会自动解析 K8s Service 的 ClusterIP,不需要手动维护后端实例列表。


七、实战:快速上手

7.1 Docker 一键启动

最简单的方式是用 Docker Compose:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# docker-compose.yml
version: "3.8"

services:
kong-database:
image: postgres:15
environment:
POSTGRES_USER: kong
POSTGRES_DB: kong
POSTGRES_PASSWORD: kong
volumes:
- kong-db-data:/var/lib/postgresql/data

kong-migrations:
image: kong:3.6
command: kong migrations bootstrap
environment:
KONG_DATABASE: postgres
KONG_PG_HOST: kong-database
KONG_PG_USER: kong
KONG_PG_PASSWORD: kong
depends_on:
- kong-database

kong:
image: kong:3.6
environment:
KONG_DATABASE: postgres
KONG_PG_HOST: kong-database
KONG_PG_USER: kong
KONG_PG_PASSWORD: kong
KONG_PROXY_ACCESS_LOG: /dev/stdout
KONG_ADMIN_ACCESS_LOG: /dev/stdout
KONG_PROXY_ERROR_LOG: /dev/stderr
KONG_ADMIN_ERROR_LOG: /dev/stderr
KONG_ADMIN_LISTEN: 0.0.0.0:8001
ports:
- "8000:8000" # Proxy
- "8443:8443" # Proxy SSL
- "8001:8001" # Admin API
depends_on:
- kong-database
- kong-migrations

volumes:
kong-db-data:

启动:

1
docker-compose up -d

7.2 配置第一个 API

用 Admin API 注册一个服务和路由:

1
2
3
4
5
6
7
8
9
10
11
12
# 1. 创建服务
curl -X POST http://localhost:8001/services \
--data "name=httpbin" \
--data "url=https://httpbin.org"

# 2. 创建路由
curl -X POST http://localhost:8001/services/httpbin/routes \
--data "name=httpbin-route" \
--data "paths[]=/httpbin"

# 3. 测试
curl http://localhost:8000/httpbin/get

7.3 加上认证和限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 1. 启用 key-auth 插件
curl -X POST http://localhost:8001/services/httpbin/plugins \
--data "name=key-auth"

# 2. 创建消费者
curl -X POST http://localhost:8001/consumers \
--data "username=app-001"

# 3. 给消费者分配 API Key
curl -X POST http://localhost:8001/consumers/app-001/key-auth \
--data "key=my-secret-api-key"

# 4. 测试(不带 Key)
curl http://localhost:8000/httpbin/get
# 返回 401 Unauthorized

# 5. 测试(带 Key)
curl http://localhost:8000/httpbin/get \
-H "apikey: my-secret-api-key"
# 返回正常数据

# 6. 启用限流插件
curl -X POST http://localhost:8001/services/httpbin/plugins \
--data "name=rate-limiting" \
--data "config.minute=10" \
--data "config.policy=local"

7.4 DB-less 模式启动

如果不想用数据库,直接用声明式配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# kong.yml
_format_version: "3.0"

services:
- name: httpbin
url: https://httpbin.org
routes:
- name: httpbin-route
paths: ["/httpbin"]
plugins:
- name: rate-limiting
config:
minute: 100
policy: local

consumers:
- username: app-001
keyauth_credentials:
- key: my-secret-api-key
1
2
# DB-less 模式启动
KONG_DATABASE=off kong start -c kong.conf --declarative-config kong.yml

八、企业级场景

8.1 多环境管理

实际项目里通常有开发、测试、生产多套环境。Kong 的命名空间(Namespaces)或者用不同的 Kong 实例来隔离:

1
2
3
4
5
6
7
# 开发环境 Kong
KONG_PROXY_LISTEN=0.0.0.0:8000
KONG_ADMIN_LISTEN=0.0.0.0:8001

# 生产环境 Kong
KONG_PROXY_LISTEN=0.0.0.0:9000
KONG_ADMIN_LISTEN=0.0.0.0:9001

8.2 灰度发布

Kong 的 Canary 插件支持按权重分流:

1
2
3
4
5
6
# 90% 流量打到 v1,10% 打到 v2
curl -X POST http://localhost:8001/services/my-service/plugins \
--data "name=canary" \
--data "config.upstream_host=v2.internal" \
--data "config.upstream_port=8080" \
--data "config.percentage=10"

8.3 API 版本管理

用 Route 的优先级来做 API 版本管理:

1
2
3
4
5
6
7
8
9
10
11
# v1 路由(低优先级)
curl -X POST http://localhost:8001/services/user-v1/routes \
--data "name=user-v1" \
--data "paths[]=/api/v1/users" \
--data "priority=1"

# v2 路由(高优先级,覆盖 v1)
curl -X POST http://localhost:8001/services/user-v2/routes \
--data "name=user-v2" \
--data "paths[]=/api/v2/users" \
--data "priority=10"

8.4 跨域处理

微服务架构下跨域问题很常见,用 CORS 插件统一处理:

1
2
3
4
5
6
curl -X POST http://localhost:8001/services/my-service/plugins \
--data "name=cors" \
--data "config.origins=https://app.example.com" \
--data "config.methods=GET,POST,PUT,DELETE" \
--data "config.headers=Content-Type,Authorization" \
--data "config.max_age=3600"

九、Kong vs 其他方案

市面上 API 网关不少,简单对比一下:

维度 Kong Spring Cloud Gateway Envoy APISIX
语言 Lua Java C++ Lua
性能 极高 中等 极高 极高
插件生态 ⭐⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐
动态配置
管理界面 企业版有 需自己开发
学习成本 中等 低(Java 生态) 中等
K8s 集成 Ingress Controller 较弱 原生支持 Ingress Controller
适用场景 通用 Spring Cloud 体系 Service Mesh 通用

选型建议

  • Java 技术栈为主 → Spring Cloud Gateway
  • Service Mesh 架构 → Envoy
  • 通用场景、插件生态要求高 → Kong 或 APISIX
  • 已经在用 OpenResty → Kong 或 APISIX

十、注意事项与局限性

10.1 性能考量

Kong 的延迟开销通常在 1-5ms(不含插件处理时间),但插件用多了会明显增加延迟。实际压测数据:

1
2
3
无插件:~1ms 延迟,50K+ RPS
3 个插件:~3ms 延迟,30K+ RPS
5+ 个插件:~5-10ms 延迟,20K+ RPS

建议:只启用真正需要的插件,不要为了”以防万一”开启一堆不用的插件。

10.2 Admin API 安全

Admin API 是 Kong 的管理接口,默认监听 8001 端口。生产环境必须限制访问

1
2
3
4
5
# 只允许内网访问
KONG_ADMIN_LISTEN=127.0.0.1:8001

# 或者加认证
KONG_ADMIN_GUI_AUTH=basic-auth

10.3 配置备份

用 DB 模式时,定期备份 PostgreSQL 数据库。用 DB-less 模式时,确保 kong.yml 文件在 Git 里有版本管理。

10.4 日志与监控

生产环境一定要配好日志和监控。推荐组合:

  • Prometheus 插件 → 暴露指标
  • Grafana → 可视化
  • http-log 或 kafka-log → 请求日志持久化

十一、总结

Kong 的核心价值可以概括为三点:

  1. 统一收口:把认证、限流、日志这些横切关注点统一到网关层,业务服务只管业务
  2. 插件化架构:功能按需启用,支持 Lua/Go/Python/JS 多语言扩展
  3. 部署灵活:DB 模式、DB-less 模式、Hybrid 模式,适应不同规模和场景

对于中大型微服务架构,Kong 是一个成熟可靠的选择。小团队或者简单场景,可能 Nginx 手搓配置就够了,没必要上 Kong。但当服务数量上了规模,统一网关的价值就体现出来了。


参考资料

“我排队第 50 名,别人排队第 200 名,为什么他先候补成功?”——这个问题困扰了无数抢票人。答案藏在 12306 候补系统的底层设计里:席位复用、区间独立排队、事件驱动匹配。本文从五个维度拆解这套世界级高并发系统的技术架构,让你不仅”知其然”,更”知其所以然”。


一、为什么 12306 如此难做?

在深入设计之前,先理解问题的规模。

1.1 业务复杂度

铁路售票与普通电商秒杀有本质区别:

对比维度 普通电商秒杀 12306 售票
商品粒度 SKU 级别(一部手机) 区间级别(北京→济南,同一座位不同区间)
库存管理 扣减库存即可 区间叠加、席位复用
并发竞争 单 SKU 多人竞争 同一座位多个区间交叉竞争
订单关联 独立订单 关联订单(联程票、往返票)

核心差异:一趟北京到上海的高铁,途经 10 个站点,同一个座位可以拆分成 9 段区间分别售卖。这带来的是指数级的复杂度。

1.2 规模数据

以 2024 年春运为例:

  • 日均访问量:100 亿次+
  • 倰值 QPS:每秒 100 万+
  • 候补订单:单日峰值 2000 万+
  • 车次数据:5000+ 趟车次,每趟车 500-2000 个座位

在这种规模下,任何不当的设计都会被瞬间放大成系统崩溃。


二、核心设计基石:席位复用

2.1 什么是席位复用?

席位复用是指同一个座位可以分段卖给不同乘客,只要乘车区间不重叠。

举例说明:

1
2
3
4
5
6
7
8
9
车次:G1 北京→青岛
途经站点:北京 → 天津 → 济南 → 淄博 → 青岛

座位 12A 的售卖情况:
- 张三:北京 → 济南(占用 北京→天津、天津→济南 两段)
- 李四:济南 → 青岛(占用 济南→淄博、淄博→青岛 两段)
- 王五:天津 → 淄博(占用 天津→济南、济南→淄博 两段)

结果:同一座位 12A,同时卖给 3 个人,互不冲突

这个设计极大提升了座位利用率,但技术实现难度陡增。

2.2 技术实现:Redis Bitmap

12306 使用 Redis Bitmap(位图) 存储每个座位的区间占用状态。

数据结构设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Key:   train:{train_id}:{date}:{seat_no}
Value: Bitmap,每一位代表一个站点区间的占用状态

示例:
车次 G1,日期 2024-01-20,座位 12A
途经站点:北京(0) → 天津(1) → 济南(2) → 淄博(3) → 青岛(4)

Bitmap: 1 1 0 0
↑ ↑ ↑ ↑
│ │ │ └─ 淄博→青岛 (0=空闲)
│ │ └─── 济南→淄博 (0=空闲)
│ └───── 天津→济南 (1=占用)
└─────── 北京→天津 (1=占用)

表示:北京→济南 已售,济南→青岛 空闲

为什么用 Bitmap?

指标 Bitmap 传统方案(如 Hash)
空间占用 N 个站点仅需 N bit(几字节) 每区间存一个字段(几百字节)
查询复杂度 O(1) O(N)
区间判断 位运算 OR,一次搞定 需遍历所有区间
适用场景 固定长度、高频查询 灵活但低效

区间可用性检查(伪代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def check_interval_available(redis, train_id, date, seat_no, from_station, to_station):
"""
检查指定区间是否可用
from_station: 起始站点索引(如北京=0)
to_station: 终点站点索引(如济南=2)
"""
key = f"train:{train_id}:{date}:{seat_no}"

# 获取 from_station 到 to_station-1 的所有 bit 位
# 即区间 [from_station, to_station)
for i in range(from_station, to_station):
bit = redis.getbit(key, i)
if bit == 1: # 该区间已被占用
return False

return True


def allocate_interval(redis, train_id, date, seat_no, from_station, to_station):
"""
分配区间:将对应 bit 位设为 1
"""
key = f"train:{train_id}:{date}:{seat_no}"

for i in range(from_station, to_station):
redis.setbit(key, i, 1)

return True

更高效的方式:使用位运算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def check_and_allocate(redis, train_id, date, seat_no, from_station, to_station):
"""
使用位运算一次性检查并分配区间
"""
key = f"train:{train_id}:{date}:{seat_no}"

# 1. 获取当前 bitmap 值(假设用整数表示)
current = redis.get(key)
if current is None:
current = 0

# 2. 构造区间掩码
# 例如 from=0, to=2,掩码为 0b0011 (低位表示前面的区间)
mask = (1 << to_station) - (1 << from_station) # 0b0011

# 3. 检查区间是否已被占用
if current & mask != 0:
return False # 区间已被占用

# 4. 分配区间(原子操作,需用 Lua 脚本)
# Lua 脚本保证检查+设置原子性
lua_script = """
local current = tonumber(redis.call('GET', KEYS[1])) or 0
local mask = tonumber(ARGV[1])
if (current & mask) == 0 then
redis.call('SET', KEYS[1], current | mask)
return 1
else
return 0
end
"""
result = redis.eval(lua_script, 1, key, mask)
return result == 1

三、分布式排队机制

3.1 核心设计:按区间独立排队

很多人以为候补是整趟车排一个大队列,其实不然。

12306 的设计:每个乘车区间独立排队。

1
2
3
4
5
6
7
8
9
10
车次 G1(北京→青岛)的候补队列:

队列1:北京 → 天津(85 人排队)
队列2:北京 → 济南(320 人排队)
队列3:天津 → 济南(56 人排队)
队列4:济南 → 青岛(198 人排队)
队列5:北京 → 青岛(1275 人排队)
...

每个区间一个独立的 Sorted Set

为什么要按区间独立排队?

  1. 公平性:北京→天津 和 北京→青岛 根本不是同一批票,放一起排队没有意义
  2. 并行处理:不同区间队列可以并发处理,提升吞吐
  3. 精准匹配:释放席位时,直接定位对应队列,无需遍历

这解释了一个常见疑问

为什么我排队 50 名,别人排队 200 名,他却先候补成功?

因为你们根本不在同一个队列。他排的是天津→济南,你排的是北京→济南。

3.2 技术实现:Redis Sorted Set

数据结构设计:

1
2
3
4
5
6
7
8
9
10
Key:   backup:{train_id}:{date}:{from_station}:{to_station}
Value: 有序集合,存储候补订单 ID
Score: 用户提交候补的时间戳(越小越靠前)

示例:
backup:G1:20240120:beijing:jinan
├─ order_001 (score: 1705678901) ← 第1名
├─ order_002 (score: 1705678905) ← 第2名
├─ order_003 (score: 1705678912) ← 第3名
└─ ...

核心操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import time

def add_to_backup_queue(redis, train_id, date, from_station, to_station, order_id):
"""
加入候补队列(ZADD)
时间戳作为 Score,天然实现先到先得
"""
key = f"backup:{train_id}:{date}:{from_station}:{to_station}"
score = int(time.time() * 1000) # 毫秒时间戳,精确排队

redis.zadd(key, {order_id: score})
return True


def get_queue_position(redis, train_id, date, from_station, to_station, order_id):
"""
查询排队名次(ZRANK)
返回当前排队位置(从 0 开始,+1 后为实际名次)
"""
key = f"backup:{train_id}:{date}:{from_station}:{to_station}"
rank = redis.zrank(key, order_id)

if rank is None:
return -1 # 未在队列中
return rank + 1 # 返回实际名次(从 1 开始)


def get_top_n_from_queue(redis, train_id, date, from_station, to_station, n=10):
"""
取出队列前 N 名(ZRANGE)
"""
key = f"backup:{train_id}:{date}:{from_station}:{to_station}"
return redis.zrange(key, 0, n - 1)


def remove_from_queue(redis, train_id, date, from_station, to_station, order_id):
"""
候补成功或取消,移出队列(ZREM)
"""
key = f"backup:{train_id}:{date}:{from_station}:{to_station}"
redis.zrem(key, order_id)
return True

性能分析:

操作 复杂度 百万级队列耗时
ZADD(加入队列) O(log N) ~21 次比较,微秒级
ZRANK(查询名次) O(log N) 微秒级
ZRANGE(取前 N 名) O(log N + M) M 为取出数量,毫秒级
ZREM(移出队列) O(log N) 微秒级

结论:Sorted Set 天然适合排队场景,百万级数据依然高效。

3.3 数据持久化:MySQL 分库分表

Redis 是内存数据库,一旦宕机可能丢失数据。候补订单必须持久化到 MySQL。

分库分表策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
分库键:user_id % 16(按用户分 16 个库)
分表键:order_id % 128(每个库 128 张表)

表结构:
CREATE TABLE backup_order_00 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
train_id VARCHAR(20) NOT NULL,
travel_date DATE NOT NULL,
from_station VARCHAR(20) NOT NULL,
to_station VARCHAR(20) NOT NULL,
seat_type TINYINT NOT NULL, -- 座位类型:商务/一等/二等
status TINYINT DEFAULT 0, -- 状态:排队中/已兑现/已取消
queue_position INT, -- 冗余存储排队名次
create_time DATETIME,
update_time DATETIME,

INDEX idx_train_date (train_id, travel_date, from_station, to_station)
);

Redis 与 MySQL 如何保持一致?

这是一个典型的分布式一致性问题,后文技术难点部分详解。


四、事件驱动:候补车票从哪里来?

候补的车票不是凭空产生的,全部来自系统事件触发。

4.1 四大票源

票源 触发事件 延迟 占比(估算)
用户退票 用户主动退票 秒级 40%
订单超时 抢到票后 30 分钟未支付 秒级 35%
改签释放 用户改签其他车次 秒级 15%
动态加挂 12306 官方追加车厢 分钟级 10%

4.2 事件驱动架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌──────────────────────────────────────────────────────────────────┐
│ 事件来源 │
├──────────┬──────────┬──────────┬─────────────────────────────────┤
│ 退票事件 │ 超时事件 │ 改签事件 │ 加挂事件(定时任务触发) │
└────┬─────┴────┬─────┴────┬─────┴─────────────┬───────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────────────────────────────────────────────────────────┐
│ 消息队列(Kafka/RocketMQ) │
│ │
│ Topic: ticket-released │
│ Partition: 按 train_id 分区,保证同一车次顺序处理 │
└───────────────────────────┬──────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ 候补匹配服务 │
│ │
│ 1. 消费释放席位事件 │
│ 2. 查询对应区间的候补队列(Redis Sorted Set) │
│ 3. 取出队列第一名 │
│ 4. 执行兑现流程 │
└──────────────────────────────────────────────────────────────────┘

事件消息结构:

1
2
3
4
5
6
7
8
9
10
11
12
{
"event_id": "evt_20240120_123456",
"event_type": "REFUND", // REFUND / TIMEOUT / RESCHEDULE / EXTRA_SEAT
"train_id": "G1",
"travel_date": "2024-01-20",
"seat_no": "12A",
"from_station": "beijing",
"to_station": "jinan",
"seat_type": 2, // 二等座
"release_time": 1705678901234,
"trace_id": "trace_abc123" // 链路追踪
}

4.3 候补匹配流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
席位释放事件触发:

Step 1: 解析事件,确定释放的区间
train_id=G1, date=20240120, seat=12A, from=beijing, to=jinan

Step 2: 计算该区间覆盖的所有原子区间
北京→济南 = [北京→天津, 天津→济南]

Step 3: 查询这些区间的候补队列
- 队列 A:北京→天津(85 人)
- 队列 B:天津→济南(56 人)
- 队列 C:北京→济南(320 人)← 完全匹配,优先处理

Step 4: 尝试匹配队列 C 的第一名
- 检查席位 12A 是否满足用户需求(座位类型、是否有票)
- 满足 → 执行兑现流程
- 不满足 → 尝试队列 C 的第二名

Step 5: 若队列 C 无匹配,尝试其他队列
- 队列 A + 队列 B 是否有同一座位?
- 这涉及跨区间匹配,后文详解

4.4 候补兑现完整流程

席位释放后,系统需要执行一整套原子流程才能完成兑现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
┌──────────────────────────────────────────────────────────────────────────┐
│ 候补兑现完整流程 │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ 席位释放事件 │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 1: 查询候补队列 ││
│ │ ││
│ │ redis.zrange("backup:G1:20240120:beijing:jinan", 0, 0) ││
│ │ → 取出排队第一名 user_id ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 2: 获取分布式锁 ││
│ │ ││
│ │ lock_keys = ["lock:G1:20240120:12A:beijing:tianjin", ││
│ │ "lock:G1:20240120:12A:tianjin:jinan"] ││
│ │ redis.set(lock_key, user_id, nx=True, px=5000) ││
│ │ → 锁定所有原子区间,防止并发超卖 ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 3: 检查席位可用性(双重检查) ││
│ │ ││
│ │ bitmap_key = "train:G1:20240120:12A" ││
│ │ check_interval_available(bitmap_key, beijing, jinan) ││
│ │ → 确认席位真正可用 ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 4: MySQL 本地事务(原子操作) ││
│ │ ││
│ │ BEGIN TRANSACTION; ││
│ │ -- 4.1 标记席位占用 ││
│ │ UPDATE seat SET status='OCCUPIED' WHERE ...; ││
│ │ -- 4.2 创建订单 ││
│ │ INSERT INTO ticket_order (...) VALUES (...); ││
│ │ -- 4.3 更新候补状态 ││
│ │ UPDATE backup_order SET status='FULFILLED' WHERE ...; ││
│ │ -- 4.4 写入操作日志(用于 Redis 异步同步) ││
│ │ INSERT INTO backup_operation_log (...) VALUES (...); ││
│ │ COMMIT; ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 5: 异步更新 Redis ││
│ │ ││
│ │ -- 5.1 更新席位 Bitmap ││
│ │ redis.setbit("train:G1:20240120:12A", beijing_idx, 1) ││
│ │ redis.setbit("train:G1:20240120:12A", tianjin_idx, 1) ││
│ │ -- 5.2 从候补队列移除 ││
│ │ redis.zrem("backup:G1:20240120:beijing:jinan", user_id) ││
│ │ -- 5.3 释放分布式锁 ││
│ │ redis.del(lock_keys) ││
│ └──────────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐│
│ │ Step 6: 后续处理 ││
│ │ ││
│ │ -- 6.1 冻结预付款(调用支付服务) ││
│ │ -- 6.2 发送短信通知(调用通知服务) ││
│ │ -- 6.3 更新操作日志状态 ││
│ └─────────────────────────────────────────────────────────────────────┘│
│ │
└──────────────────────────────────────────────────────────────────────────┘

流程关键点

步骤 关键点 失败处理
获取锁 按字典序获取,避免死锁 获取失败 → 席位已被抢占,跳过
双重检查 锁后再检查,防止并发穿透 不可用 → 释放锁,尝试下一名用户
MySQL 事务 本地事务保证原子性 失败 → 释放锁,回滚,尝试下一名
异步更新 Redis 通过操作日志保证最终一致 失败 → 定时任务补偿恢复
后续处理 异步执行,不阻塞主流程 失败 → 重试或告警人工处理

五、高并发架构设计

5.1 四级防护架构

从用户请求到数据落地,12306 构建了四级防护体系,逐级降压:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
┌─────────────────────────────────────────────────────────────────┐
│ 用户请求 │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 第一级:网关层限流 │
│ │
│ 令牌桶算法:限制单 IP、单用户请求频率 │
│ 防刷策略:识别异常请求模式 │
│ 作用:挡住恶意流量,保护后端服务 │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 第二级:应用层本地缓存 │
│ │
│ 技术:Caffeine / Guava Cache │
│ 缓存内容: │
│ - 热门车次的区间队列长度(用户查询"排队第几名") │
│ - 热门车次的席位概览 │
│ - TTL:5-10 秒(容忍短时不一致) │
│ 效果:拦截 60%+ 的查询请求,减轻 Redis 压力 │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 第三级:Redis 集群缓存 │
│ │
│ 部署:主从 + 哨兵,保证高可用 │
│ 存储: │
│ - 席位 Bitmap(实时状态) │
│ - 候补 Sorted Set(排队数据) │
│ - 分布式锁(并发控制) │
│ 性能:单节点 10 万+ QPS,全内存操作 │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 第四级:MySQL 分库分表 │
│ │
│ 存储:订单持久化、用户数据、候补记录 │
│ 分片:按 user_id 分库,按 order_id 分表 │
│ 作用:数据兜底、Redis 故障时的降级数据源 │
└─────────────────────────────────────────────────────────────────┘

为什么需要四级?

级别 核心作用 如果缺失会怎样
网关限流 拦截恶意流量 后端被刷爆,正常用户无法访问
本地缓存 拦截高频查询 Redis 被查询请求压垮
Redis 缓存 扛住核心读写 MySQL 被高频请求打穿
MySQL 持久化 数据兜底 Redis 故障时数据丢失

5.2 限流降级策略

场景 限流策略 降级措施 用户感知
正常高峰 令牌桶 10 万 QPS 正常服务
超高峰 滑动窗口限流 排队等待 “系统繁忙,请稍后”
Redis 故障 熔断 查询走 MySQL,写入走 MQ 候补延迟,排队名次不可查
MySQL 故障 熔断 暂停下单,只读 “系统维护中”

六、技术难点深入

前文介绍了 12306 候补系统的核心设计,但真正让系统”稳如磐石”的,是对细节的极致处理。

6.1 难点一:分布式一致性

问题:Redis 队列与 MySQL 订单如何保持一致?

1
2
3
4
5
6
7
8
9
10
11
场景:用户取消候补

方案一:先更新 MySQL,再删除 Redis
1. MySQL DELETE 成功
2. Redis ZREM 失败(网络抖动)
→ 结果:MySQL 无订单,Redis 仍在排队,用户"幽灵排队"

方案二:先删除 Redis,再更新 MySQL
1. Redis ZREM 成功
2. MySQL DELETE 失败
→ 结果:Redis 已删除,MySQL 仍存在,用户想恢复排队无法恢复

解决方案:本地消息表 + 最终一致性

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 候补操作日志表(与业务表同库,利用本地事务)
CREATE TABLE backup_operation_log (
log_id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
operation_type VARCHAR(20) NOT NULL, -- ADD / CANCEL / FULFILL
redis_status TINYINT DEFAULT 0, -- 0=待处理 1=成功 2=失败
mysql_status TINYINT DEFAULT 0, -- 0=待处理 1=成功 2=失败
retry_count INT DEFAULT 0,
create_time DATETIME,
update_time DATETIME,

INDEX idx_status (redis_status, mysql_status)
);

操作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def cancel_backup_order(order_id):
"""取消候补订单(保证最终一致性)"""

# Step 1: 本地事务,写入操作日志 + 更新订单状态
with db.transaction():
# 标记订单为"取消中"
db.execute("UPDATE backup_order SET status = 'CANCELING' WHERE order_id = ?", order_id)

# 写入操作日志
log_id = db.execute("""
INSERT INTO backup_operation_log
(order_id, operation_type, redis_status, mysql_status)
VALUES (?, 'CANCEL', 0, 0)
""", order_id)

# Step 2: 异步处理 Redis 操作
mq.send('backup_operation', {'log_id': log_id, 'order_id': order_id, 'type': 'CANCEL'})

return True


def process_cancel_message(message):
"""消费取消消息"""
log_id = message['log_id']
order_id = message['order_id']

try:
# Step 3: 从 Redis 队列移除
redis.zrem(f"backup:{train}:{date}:{from}:{to}", order_id)

# Step 4: 更新 MySQL 订单状态
db.execute("UPDATE backup_order SET status = 'CANCELED' WHERE order_id = ?", order_id)

# Step 5: 更新操作日志
db.execute("UPDATE backup_operation_log SET redis_status=1, mysql_status=1 WHERE log_id=?", log_id)

except Exception as e:
# 失败后重试,超过 3 次告警
retry_count = db.query("SELECT retry_count FROM backup_operation_log WHERE log_id=?", log_id)
if retry_count < 3:
db.execute("UPDATE backup_operation_log SET retry_count = retry_count + 1 WHERE log_id=?", log_id)
mq.send('backup_operation', message, delay=60) # 60 秒后重试
else:
alert(f"候补取消失败,log_id={log_id}, error={e}")

定时对账任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def reconcile_backup_data():
"""每小时对账,确保 Redis 与 MySQL 一致"""

# Step 1: 扫描所有候补队列
all_queues = redis.keys("backup:*")

for queue_key in all_queues:
order_ids = redis.zrange(queue_key, 0, -1)

for order_id in order_ids:
# Step 2: 检查 MySQL 是否存在
order = db.query("SELECT * FROM backup_order WHERE order_id = ?", order_id)

if not order or order.status == 'CANCELED':
# Step 3: Redis 存在,MySQL 不存在或已取消,清理 Redis
redis.zrem(queue_key, order_id)
log.info(f"清理幽灵排队:{order_id}")

6.2 难点二:并发竞争与超卖

问题:同一席位释放,多个区间队列如何竞争?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
场景:
车次 G1,座位 12A
张三退票:北京 → 济南

释放区间:北京→天津、天津→济南

现有候补队列:
- 队列 A:北京→天津(100 人)
- 队列 B:天津→济南(80 人)
- 队列 C:北京→济南(200 人)

如果不加控制:
1. 队列 A 把 12A 分给用户 X(北京→天津)
2. 队列 C 把 12A 分给用户 Y(北京→济南)
→ X 和 Y 的区间重叠,超卖!

解决方案:分布式锁 + 区间锁定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def allocate_seat_to_backup(train_id, date, seat_no, from_station, to_station, user_id):
"""
分配席位给候补用户
核心思想:锁定所有涉及的原子区间,防止并发超卖
"""

# Step 1: 获取该区间的所有原子区间(相邻站点间)
atomic_intervals = get_atomic_intervals(train_id, from_station, to_station)
# 例如:北京→济南 = [(北京,天津), (天津,济南)]

# Step 2: 生成锁 Key(按字典序排序,避免死锁)
lock_keys = sorted([
f"lock:{train_id}:{date}:{seat_no}:{f}:{t}"
for f, t in atomic_intervals
])

# Step 3: 尝试获取所有锁(SET NX PX,带超时)
lock_acquired = []
for key in lock_keys:
success = redis.set(key, user_id, nx=True, px=5000) # 5 秒超时
if not success:
# 获取失败,释放已获取的锁
for acquired_key in lock_acquired:
redis.delete(acquired_key)
return False, "席位已被占用"
lock_acquired.append(key)

try:
# Step 4: 再次检查席位可用性(双重检查)
if not check_seat_available(train_id, date, seat_no, from_station, to_station):
return False, "席位不可用"

# Step 5: 执行兑现流程
# 5.1 标记席位占用
mark_seat_occupied(train_id, date, seat_no, from_station, to_station, user_id)

# 5.2 创建订单(MySQL 本地事务)
order_id = create_ticket_order(user_id, train_id, date, seat_no, from_station, to_station)

# 5.3 冻结预付款
freeze_payment(user_id, order_id)

# 5.4 从候补队列移除
redis.zrem(f"backup:{train_id}:{date}:{from_station}:{to_station}", user_id)

# 5.5 发送通知
send_notification(user_id, f"候补成功!订单号:{order_id}")

return True, order_id

finally:
# Step 6: 释放锁
for key in lock_acquired:
# 使用 Lua 脚本,确保只释放自己持有的锁
redis.eval("""
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
""", 1, key, user_id)

关键点

  1. 锁粒度:锁到”原子区间”级别(相邻站点间),而非整个座位
  2. 锁顺序:按字典序获取锁,避免死锁(A 等待 B,B 等待 A)
  3. 双重检查:获取锁后再次检查可用性,防止并发穿透
  4. 原子释放:使用 Lua 脚本,确保只释放自己持有的锁

6.3 难点三:跨区间匹配

问题:候补”长区间”,释放”短区间”如何匹配?

1
2
3
4
5
6
7
8
9
10
11
场景:
用户候补:北京 → 青岛(全程)

当前释放:
- 张三退票:北京 → 天津(仅第一段)
- 李四退票:天津 → 济南(仅第二段)
- 王五退票:济南 → 青岛(仅后两段)

能否匹配?
- 如果三段都同时释放,可以合并给用户
- 但现实中,退票是离散事件,难以凑齐所有区间

方案一:不跨区间匹配(简单,12306 当前方案)

1
2
3
4
5
6
7
8
9
10
11
规则:
- 用户候补 北京→青岛,必须有 北京→青岛 的完整区间空位
- 不能由 北京→天津 + 天津→青岛 拼凑

优点:
- 实现简单,逻辑清晰
- 避免复杂的跨区间锁竞争

缺点:
- 用户体验稍差,需要分段候补
- 票源利用率降低

方案二:智能跨区间匹配(复杂,提升体验)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def smart_match_backup(train_id, date, from_station, to_station, user_id):
"""
智能匹配:尝试拼接多个短区间满足长区间需求
核心逻辑:遍历所有座位,找到在所有区间都空闲的座位
"""

# Step 1: 获取所有原子区间
atomic_intervals = get_atomic_intervals(train_id, from_station, to_station)
# 例如:北京→青岛 = [北京→天津, 天津→济南, 济南→淄博, 淄博→青岛]

# Step 2: 获取该车次所有座位列表
all_seats = get_all_seats(train_id, date, seat_type)
# 例如:['12A', '12B', '12C', '13A', '13B', ...]

# Step 3: 遍历每个座位,检查是否在所有区间都空闲
for seat_no in all_seats:
bitmap_key = f"train:{train_id}:{date}:{seat_no}"

# 检查该座位在所有原子区间是否空闲
all_available = True
for i, (f, t) in enumerate(atomic_intervals):
station_idx = get_station_index(f) # 获取站点索引
bit = redis.getbit(bitmap_key, station_idx)
if bit == 1: # 该区间已被占用
all_available = False
break

if all_available:
# 找到一个在所有区间都空闲的座位
return seat_no

# Step 4: 无完整匹配,返回 None
# 可选:记录缺票区间,等待后续释放
return None


def on_seat_released(train_id, date, seat_no, from_station, to_station):
"""
席位释放事件触发匹配
"""

# 查询所有可能匹配的候补队列
# 包括:精确匹配 + 跨区间匹配等待中的用户
related_queues = find_related_backup_queues(train_id, date, from_station, to_station)

for queue_key in related_queues:
# 取出队列前 10 名,尝试匹配
candidates = redis.zrange(queue_key, 0, 9)

for user_id in candidates:
# 获取该用户的候补需求
request = get_backup_request(user_id)

# 尝试为该用户匹配座位
seat = smart_match_backup(
train_id, date,
request.from_station, request.to_station,
user_id
)

if seat:
allocate_seat(seat, request.from_station, request.to_station, user_id)
break # 席位已分配,退出当前队列

跨区间匹配的挑战

挑战点 说明
复杂度激增 需维护”缺票区间索引”,查询效率下降
并发竞争更复杂 多个部分匹配用户竞争同一区间
用户体验不确定 部分匹配时,用户不知道自己排的是哪个队列

实际权衡:12306 当前采用”不跨区间匹配”策略,牺牲部分体验换取系统简洁。


6.4 难点四:热点倾斜

问题:春运热门线路,单个区间队列百万级

1
2
3
4
5
6
7
8
9
10
11
12
场景:
春节前一周,北京→哈尔滨,候补队列 200 万人

Sorted Set 性能:
- ZADD: O(log N) = log(2,000,000) ≈ 21 次操作
- ZRANK: O(log N) ≈ 21 次操作
- 单次操作微秒级,没问题

问题:
1. 频繁 ZRANK 查询排队名次,热点 Key
2. 频繁 ZRANGE 取队首用户,热点 Key
3. Redis 单线程,单 Key 成为瓶颈

解决方案:分片队列 + 虚拟排队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 方案一:队列分片
def get_shard_key(train_id, date, from_station, to_station, user_id):
"""
将单个大队列拆分成多个小队列
按 user_id 分片,保证同一用户始终在同一个分片
"""
shard_count = 32 # 分片数量
shard_id = hash(user_id) % shard_count
return f"backup:{train_id}:{date}:{from_station}:{to_station}:shard:{shard_id}"


def add_to_sharded_queue(train_id, date, from_station, to_station, user_id):
"""
加入分片队列
"""
shard_key = get_shard_key(train_id, date, from_station, to_station, user_id)
score = int(time.time() * 1000)
redis.zadd(shard_key, {user_id: score})


def get_queue_position_sharded(train_id, date, from_station, to_station, user_id):
"""
查询分片队列中的名次
"""
shard_key = get_shard_key(train_id, date, from_station, to_station, user_id)

# 本分片内的名次
local_rank = redis.zrank(shard_key, user_id)
if local_rank is None:
return -1

# 加上其他分片的历史完成数
total_finished = sum([
redis.get(f"backup:{train_id}:{date}:{from_station}:{to_station}:shard:{i}:finished")
for i in range(32)
])

# 虚拟名次(更准确的估算)
return local_rank + total_finished + 1


# 方案二:虚拟排队名次
def get_virtual_position(redis, train_id, date, from_station, to_station, user_id):
"""
不返回真实名次,而是"虚拟名次"
避免用户看到"排队 150 万名"直接放弃
"""
real_rank = get_real_position(redis, train_id, date, from_station, to_station, user_id)

if real_rank <= 100:
# 前 100 名,返回真实名次
return real_rank
elif real_rank <= 1000:
# 100-1000 名,显示"排名前 1000"
return f"前 1000 名"
else:
# 1000 名以后,显示预估等待时间
avg_fulfill_rate = get_historical_fulfill_rate(train_id, from_station, to_station)
wait_hours = real_rank / avg_fulfill_rate
return f"预计等待 {wait_hours:.1f} 小时"

本地缓存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 方案三:热点数据本地缓存
from cachetools import TTLCache

# 本地缓存:热门车次的队列长度
queue_length_cache = TTLCache(maxsize=10000, ttl=10) # 10 秒过期

def get_queue_length(train_id, date, from_station, to_station):
"""
获取队列长度(优先本地缓存)
"""
cache_key = f"{train_id}:{date}:{from_station}:{to_station}"

if cache_key in queue_length_cache:
return queue_length_cache[cache_key]

# 本地缓存未命中,查询 Redis
length = redis.zcard(f"backup:{train_id}:{date}:{from_station}:{to_station}")
queue_length_cache[cache_key] = length

return length

6.5 难点五:容灾与降级

问题:Redis 故障时如何保证候补可用?

1
2
3
4
故障场景:
1. Redis 主节点宕机,哨兵切换中(30 秒不可用)
2. Redis 集群网络分区,部分数据不可达
3. Redis 内存溢出,被系统 OOM Killer 杀掉

多级容灾设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
┌─────────────────────────────────────────────────────────────────┐
│ 正常流程 │
│ │
│ 用户请求 → Redis Sorted Set 排队 → 候补匹配 → 订单创建 │
│ │
└───────────────────────────┬─────────────────────────────────────┘
│ Redis 故障检测

┌─────────────────────────────────────────────────────────────────┐
│ 降级流程 │
│ │
│ 用户请求 → MQ 消息队列暂存 → MySQL 记录排队 → 后台任务恢复 │
│ │
│ 具体措施: │
│ 1. 新增候补请求写入 MQ(Kafka),不直接写 Redis │
│ 2. MySQL 记录候补订单(status=PENDING) │
│ 3. 后台任务监控 Redis 恢复后,将 MQ 消息同步到 Redis │
│ 4. 查询排队名次返回"系统繁忙,请稍后查询" │
│ │
└─────────────────────────────────────────────────────────────────┘

降级代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def add_backup_order_with_fallback(train_id, date, from_station, to_station, user_id):
"""
加入候补队列(带降级)
"""
try:
# 尝试写入 Redis
redis.zadd(f"backup:{train_id}:{date}:{from_station}:{to_station}",
{user_id: time.time()})
return True, "排队成功"

except RedisError as e:
# Redis 故障,降级到 MQ
log.error(f"Redis 故障,降级到 MQ: {e}")

# 写入 MQ
mq.send('backup_queue_pending', {
'train_id': train_id,
'date': date,
'from': from_station,
'to': to_station,
'user_id': user_id,
'timestamp': time.time()
})

# 写入 MySQL(状态为 PENDING,待恢复)
db.execute("""
INSERT INTO backup_order
(user_id, train_id, travel_date, from_station, to_station, status)
VALUES (?, ?, ?, ?, ?, 'PENDING')
""", user_id, train_id, date, from_station, to_station)

return True, "排队成功,系统繁忙,名次稍后可查"


def recover_redis_from_mq():
"""
后台任务:Redis 恢复后,从 MQ 恢复排队数据
关键:使用幂等操作,防止重复添加
"""
messages = mq.consume('backup_queue_pending', batch_size=100)

for msg in messages:
try:
queue_key = f"backup:{msg['train_id']}:{msg['date']}:{msg['from']}:{msg['to']}"

# 幂等检查:先查询用户是否已在队列中
existing_score = redis.zscore(queue_key, msg['user_id'])
if existing_score is not None:
# 用户已在队列中,跳过(保留原有的 Score,不更新)
log.info(f"用户 {msg['user_id']} 已在队列中,跳过恢复")
else:
# 用户不在队列中,添加到队列
redis.zadd(queue_key, {msg['user_id']: msg['timestamp']})

# 更新 MySQL 状态为 QUEUED
db.execute("""
UPDATE backup_order SET status = 'QUEUED'
WHERE user_id = ? AND train_id = ? AND travel_date = ?
""", msg['user_id'], msg['train_id'], msg['date'])

except RedisError as e:
log.error(f"Redis 仍未恢复,稍后重试: {e}")
break # Redis 仍不可用,等待下次恢复

降级流程关键点

关键点 说明
幂等恢复 检查用户是否已在队列,避免重复添加或 Score 被更新
状态追踪 MySQL 记录 PENDING→QUEUED 状态变化,便于监控
批量消费 每次消费 100 条,避免单条失败阻塞整批
优雅退出 Redis 仍不可用时 break,保留未处理消息供下次恢复

七、总结与思考

7.1 12306 候补系统的设计精髓

设计点 核心思想 技术实现
席位复用 同一座位分段售卖,提升利用率 Redis Bitmap
分布式排队 按区间独立排队,公平精准 Redis Sorted Set
事件驱动 四类票源触发自动兑现 Kafka + 消费者模式
高并发架构 四级防护 + 限流降级 网关 + Caffeine + Redis + MySQL
一致性保证 本地消息表 + 最终一致 MySQL 事务 + MQ
并发控制 分布式锁 + 双重检查 Redis SET NX + Lua
容灾降级 MQ 暂存 + 幂等恢复 Kafka + 定时对账

7.2 为什么官方候补比第三方靠谱?

对比维度 官方候补 第三方抢票软件
数据源 直接操作席位数据,无延迟 轮询 12306 接口,有延迟
排队公平性 时间戳排序,先到先得 无法获取真实排队数据
票源覆盖 四类票源全覆盖,包括动态加挂 只能监控部分票源
系统稳定性 高并发架构 + 容灾降级 容易被 12306 限流封禁

7.3 可借鉴的架构思想

  1. 数据结构选型决定系统上限:Bitmap 和 Sorted Set 的选择,让百万级数据依然高效

  2. 分而治之解决规模问题:按区间独立排队,将全局问题拆解为局部问题

  3. 事件驱动解耦复杂逻辑:退票、超时、改签、加挂,统一为事件,简化处理

  4. 四级防护扛住高并发:网关限流 → 本地缓存 → Redis → MySQL,逐级降压

  5. 最终一致胜过强一致:分布式系统中,最终一致更易实现,用户体验更好

  6. 幂等设计防止重复操作:降级恢复时先检查再添加,避免数据错乱


Oracle SQL*LoaderOracle 数据库管理系统中一个强大的工具,用于高效地将大量数据从外部文件加载到 Oracle 数据库中。它提供了一种快速、灵活的方式来导入数据,适用于各种数据格式和文件类型。本文将介绍 SQL*Loader 的基本概念、工作原理以及实际应用场景。

一、什么是 SQL*Loader?

SQL*Loader 是一个用于导入数据的实用程序,允许用户将普通文件、CSV 文件等外部数据源中的数据加载到 Oracle 数据库表中。它是 Oracle 数据库中的一个标准工具,可以轻松地处理大规模的数据加载任务。

二、SQL*Loader 的工作原理

SQL*Loader 的工作原理比较简单:

  • 控制文件定义:编写一个控制文件,其中指定了要加载的目标表、字段映射关系、数据格式等信息。控制文件是 SQL*Loader 的核心配置文件之一;

  • 准备外部数据文件:用户需要准备一个包含待加载数据的外部文件,可以是纯文本文件、CSV 文件等格式;

  • 运行 SQL*Loader:通过命令行或者其他界面工具运行 SQL*Loader,并指定控制文件和数据文件的位置。SQL*Loader 将根据控制文件加载到目标表中;

  • 数据加载SQL*Loader 按照控制文件中指定的规则,逐行解析外部数据文件,并将数据插入到目标表中;

三、SQL*Loader 控制文件

这是一个控制文件模板样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
LOAD DATA
INFILE 'data.csv' -- 指定外部数据文件的路径
INTO TABLE employees -- 指定目标表名
CHARACTERSET UTF8 -- 指定外部数据文件的字符集编码为 UTF-8
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' -- 指定字段分隔符和可选的字段包裹符
(
employee_id, -- 目标表的列名
first_name,
last_name,
email,
hire_date DATE 'YYYY-MM-DD' -- 数据格式化,确保日期格式正确
)
WHEN (hire_date >= '2022-02-01')
  • LOAD DATA 表示开始加载数据的声明;
  • INFILE 'data.csv' 指定了外部数据文件的路径。你需要将 data.csv 替换为实际的外部数据文件名,并确保文件路径正确;
  • INTO TABLE employees 指定了目标表名为 employees,即将数据加载到 employees 表中。你需要将 employees 替换为实际的目标表名;
  • CHARACTERSET UTF8 指定外部数据文件的字符集编码为 UTF-8
  • FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' 定义了字段的分隔符和可选的字段包裹符。在这个示例中,字段被逗号分隔,并且可能被双引号包裹。根据实际情况,你可能需要调整这些选项;
  • (employee_id, first_name, last_name, email, hire_date DATE 'YYYY-MM-DD') 定义了要加载的字段和它们的数据类型。确保与目标表的列名和数据类型匹配。在这个示例中,hire_date 被格式化为日期,并指定了日期的格式;
  • WHEN (hire_date >= '2022-01-01') 定义了加载数据的条件,只有满足条件的数据,才会被导入 Oracle 数据库;

四、SQL*Loader 的应用场景

SQL*Loader 在实际应用中有广泛的用途,例如:

  • 数据迁移和导入:当需要将外部数据源中的数据导入到 Oracle 数据库中时,SQL*Loader 是一个好的选择。它可以通过灵活的配置,处理大量的数据;
  • 数据集成和同步:在数据集成和同步的场景中,SQL*Loader 可以用于将不同系统或者数据源中的数据整合到同一数据库中,以便进行分析、报告等操作;
  • 日常数据加载:从外部系统中获取数据,并将其加载到 Oracle 数据库中进行进一步处理。SQL*Loader 可以自动化这一过程,提高数据处理的效率;

五、实际应用

以下是一个简单的示例,演示如何使用 SQL*LoaderCSV 文件加载到 Oracle 表中:

  • 创建一个控制文件 data.ctl,定义目标表和字段映射关系:
1
2
3
4
5
6
7
8
9
10
11
sqlCopy codeLOAD DATA
INFILE 'data.csv'
INTO TABLE employees
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
( employee_id,
first_name,
last_name,
email,
hire_date DATE 'YYYY-MM-DD'
)
WHEN (hire_date >= '2022-02-01')
  • 准备外部数据文件 data.csv,包含待加载的数据。
employee_id first_name last_name email hire_date
001 zhangsan@gmail.com 2023-02-01
002 lisi@gmail.com 2023-02-02
  • 在命令行中运行 SQL*Loader
1
2
bashCopy code
sqlldr username/password@database control=data.ctl

这样,SQL*Loader 就会将 data.csv 中的数据加载到名为 employees 的表中。

六、结论

Oracle SQL*Loader 是一个强大的数据加载工具,可用于高效地将外部数据加载到 Oracle 数据库中。通过简单的配置和命令,用户可以轻松地处理大量的数据加载任务,提高数据处理的效率和可靠性。


一、跨库分页查询

对于一定数量级的表,如订单表,通常采用分库分表的方式保存数据。根据指定字段,如果用户ID,散列数据到不同的库中。那么,如果需要按照下单时间分页查询订单信息,就涉及到跨库查询。

假设有45笔订单,存于三个库中,散列算法是 OrderID % 3,则数据分布为:

如果以每页五个订单,查询第二页的所有订单,则单库查询 sql 为:

1
select * from order_info order by id limit 5 offset 5;

但跨库查询就行不通了。下面,主要有三种方案可以用于跨库分页查询:

  • 全局查询法
  • 禁止跳页查询法
  • 二次查询法

二、全局查询法

全局查询法,需要在每个分库中执行查询语句,然后再程序中排序,再定位切割到指定的数据段。

如果需要查询第二页订单,需要查询每个库的前二页数据:

1
2
3
select * from order_info_1 order by id limit 10;
select * from order_info_2 order by id limit 10;
select * from order_info_3 order by id limit 10;

结果为:

将以上三个库的查询结果排序:

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30

那么第二页的订单列表为:

6,7,8,9,10

小结:对于低页码查询,全局查询法是可以应付的,但是,当页码越来越大,查出来的数据也就越来越多,需要排序的数据也越来越多,查询效率也就会越来越慢。

三、禁止跳页查询法

全局查询法的一个显著缺陷,就是随着页码越来越大,查询的数据量也越来越大。那么,如果禁止跳页查询,且每次查询都以上次查询的最大ID为基点,就可以保证每次查询的数据量都是相同的。

查询第一页数据:

将以上三个库的查询结果排序:

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15

那么第一页的订单列表为:

1,2,3,4,5

查询第二页数据:

第一页的订单ID最大值为5,因此第二页的订单ID起始值应大于5,查询得到:

将以上三个库的查询结果排序:

6,7,8,9,10,11,12,13,14,15,16,17,18,19,20

那么第二页的订单列表为:

6,7,8,9,10

小结:禁止跳页查询法,保证每次查询的数据量是相同的,避免了分页查询带来的性能衰减问题;但禁止跳页也是功能缺陷,没法一步定位到指定数据段。

四、二次查询法

二次查找法,既满足跳页查询,也能避免分页查询性能衰减。为了解释这一思想,我们以查询第三页订单数据为例。单库查询语句:

1
select * from order_info order by id limit 5 offset 10;

之所以叫二次查询法,当然需要查询两次。这两次查询有什么不同,希望通过以下四个步骤说清楚:

第一步:语句改写

select * from order_info order by id limit 5 offset 10 改写成 select * from order_info order by id limit 5 offset 3。偏移量10变成3,是基于10/3计算得出的。将语句在三个库分别执行,得到数据:

第二步:找最小值

  • 第一个库:最小数据为8
  • 第二个库:最小数据为11
  • 第三个库:最小数据为12

因此,从三个库中拿到的最小数据为8。

第三步:第二次语句改写

这次需要把 select * from order_info order by id limit 5 offset 3 改写成一个between语句,起点是最小的OrderID,终点是原来每个分库各自返回数据的最大值:

  • 第一个分库改写为: select * from order_info order by id where id between id_min and 22
  • 第二个分库改写为: select * from order_info order by id where id between id_min and 23
  • 第三个分库改写为: select * from order_info order by id where id between id_min and 24

查询结果如下:

第四步:找到id_min的全局偏移量

第一次查询的偏移量为3,那么每一个库的第一个目标数据的偏移量应该都是4。因此可知每个库的id_min的偏移量:

  • 第一个库:8就是id_min,偏移量为4;
  • 第二个库:11的偏移量为4,那么id_min的偏移量就是1;
  • 第三个库:12的偏移量为4,那么id_min的偏移量就是3;

因此id_min的全局偏移量为:4 + 1 + 3 = 8。

第五步:定位目标数据

  • 第一个库:8,13,14,19,22
  • 第二个库:9,10,11,16,17,18,23
  • 第三个库:12,15,20,21,24

经过排序,得到:

8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24

因为id_min的全局偏移量为8,最终结果需要 limit 5 offset 10,因此需要向后推移10 - 8 = 2 位,然后再取5位,得到:

11,12,13,14,15

小结:二次查找法,既避免了数据越处理越多,也支持跳转查询。但其也存在短板,需要查询两次,才能拿到目标数据。


一、零拷贝技术

零拷贝技术,是相对于传统 IO 的一种技术思想。传统 IO :读写数据是在用户空间和内核空间来回复制,而内核空间的数据是通过操作系统层面的 IO 接口从磁盘读取或写入的,中间也需要多次拷贝,因此效率较低。零拷贝技术,目的是尽可能地减少上下文切换和拷贝次数,提升操作性能。

二、传统 IO 实现原理

当应用服务接收客户端的请求时,传统 IO 通常需要两种系统调用:

1
2
3
4
// 读取
read(file, tmp_buf, len);
// 写入
write(socket, tmp_buf, len);

从细分图中可知,虽然只是简单的读写操作,但内部的流程还是比较复杂的。

一次读写将发生 4 次上下文切换:

  • 读取数据:从用户态切换到内核态;
  • 读取完毕:内核完成数据准备,从内核态切换到用户态;
  • 写入数据:从用户态切换到内核态;
  • 写入完毕:内核完成数据写入,从内核态切换到用户态;

一次读写将发生 4 次数据拷贝 (2 次 DMA 拷贝 + 2 次 CPU 拷贝):

  • 第一次拷贝 (DMA):把磁盘文件数据拷贝到内核缓冲区;
  • 第二次拷贝 (CPU):把内核缓冲区的数据拷贝到用户缓冲区,供应用程序使用;
  • 第三次拷贝 (CPU):把用户缓冲区的数据拷贝到内核 socket 缓冲区;
  • 第四次拷贝 (DMA):把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

虽然一次上下文切换需耗时只有几微秒,但在高并发场景中,这种延迟容易被积累和放大,从而影响整体性能。此外,磁盘和网卡操作速度远远小于内存,而内存操作速度又远远小于 CPU,4 次拷贝将严重拖慢系统性能。因此,提高 IO 性能,需要从减少上下文切换次数和数据拷贝次数两方面入手。

三、零拷贝实现

基于以上的讨论,可知零拷贝技术的设计思路:尽可能地减少上下文切换次数和数据拷贝次数。

零拷贝的具体实现方式有:

  • mmap:将内核空间和用户空间的虚拟地址映射到同一物理地址;
  • sendfile:直接把内核缓冲区的数据拷贝到网卡缓冲区;
  • direct IO:在应用层与磁盘、网卡之间建立直接通道;
3.1 mmap 实现零拷贝

在介绍 mmap() 的作用机制之前,先介绍一个新概念:虚拟内存。虚拟内存是现代操作系统中普遍使用的内存结构,使用虚拟地址代替物理内存,有两点好处:一是多个虚拟地址可以指向同一个物理地址;二是虚拟内存空间远远大于物理内存空间。

在传统 IO 中,read() 调用会把内核缓冲区的数据拷贝到用户缓冲区,耗时又耗力。如果把内核空间和用户空间的虚拟地址映射到同一个物理地址,那么就不需要 CPU 来回复制了。

mmap() 正是利用了虚拟内存的这一特性,取代传统 IO 的 read() 操作,并将内核缓冲区和用户缓冲区地址映射到同一物理内存地址,省去一次 CPU 拷贝的过程,提升 IO 性能。具体过程如下:

  • 应用进程调用 mmap() 后,DMA 会把磁盘文件数据拷贝到内核缓冲区;
  • 应用进程跟操作系统内核共享这个缓冲区;

  • 应用进程再调用 write(),直接将内核缓冲区的数据拷贝到内核 socket 缓冲区;

  • DMA 把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

从调用过程可知,与传统 IO 相比,mmap() + write 只减少了 1 次 CPU 拷贝,仍然要发生 4 次上下文切换和 3 次数据拷贝。

3.2 sendfile() 实现零拷贝

senfile() 是 Linux 提供的,专门用于发送文件的系统调用函数。sendfile() 可以替代传统 IO 的 read()、write() 函数,这意味着将省去 2 次上下文切换。此外,数据拷贝路径也有所优化,具体的优化方案与 Linux 内核版本有关,因为在 2.4 版本之后,Linux 提供了 SG-DMA 技术,它将提供比 DMA 技术更进一步的优化策略。

在 2.4 版本之前,CPU 可以直接把内核缓冲区的数据拷贝到内核 socket 缓冲区,省去拷贝到用户缓冲区这一步,因此还存在 2 次上下文切换和 3 次数据拷贝。

具体执行步骤:

  • DMA 把磁盘文件数据拷贝到内核缓冲区;
  • CPU 把内核缓冲区的数据拷贝到内核 socket 缓冲区;
  • DMA 把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

在 2.4 版本之后,引入了 SG_DMA 技术,如果相应的网卡支持该技术,那么就可以把内核缓冲区的数据直接拷贝到网卡缓冲区,也就是说还存在 2 次上下文切换和 2 次数据拷贝。

具体执行步骤:

  • DMA 把磁盘文件数据拷贝到内核缓冲区;
  • 把内核缓冲区描述符和数据长度传到内核 socket 缓冲区;
  • SG-DMA 直接把内核缓冲区的数据拷贝到网卡缓冲区;

3.3 direct IO

直接 IO 是在用户缓冲区和磁盘、网卡之间建立直接通道的技术设计。直接 IO 在读写数据时,可以绕开内核,减少上下文切换和数据拷贝的次数,从而提高效率。

具体执行步骤:

  • DMA 把磁盘文件数据直接拷贝到用户缓冲区;
  • DMA 把用户缓冲区的数据直接拷贝到网卡缓冲区;

直接 IO 使用直接通道操作数据,由应用层完全管理数据,其优缺点也是很明显的。

优点:

  • 应用层与磁盘、网卡建立直接通道,减少了上下文切换和数据拷贝的次数,速度更快;
  • 数据直接缓存在应用层,应用可以更加灵活得操作数据;

缺点:

  • 在应用层引入直接 IO,需要应用层自主管理,给系统增添了额外的复杂度;
  • 若数据不在应用层缓冲区,那么将直接操作磁盘文件读写,将大大拖慢性能;

在 Java 开发中,BigDecimal 是处理高精度数值的必备工具。然而,当我们需要将其转换为字符串时,面对 toString()toPlainString()toEngineeringString() 三个方法,很多开发者往往不知如何选择。本文将深入源码层面,剖析三者的实现原理,揭示常见陷阱,并提供最佳实践指导。

阅读全文 »


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、分片化 Step

如果一个 Step 的任务量比较大,可以尝试将其拆分成多个子任务。子任务之间可并行处理且互不干扰,这将大大提升批处理效率。例如:Master 这个 Step 迁移 100000 条数据需要 100 s,如果将其拆分为 100 个 Slave 任务,那么时间可缩短至 1 s。

Step 分片原理,是一个 Master 处理器对应多个 Salve 处理器。Slave 处理器可以是远程服务,也可以是本地执行线程。主从服务间的消息不需要持久化,也不需要严格保证传递,因为 JobRepository 的元数据管理,是将每个 Salve 独立保存在 batch_step_execution 中的,这样便可以保证每个 Slave 任务只执行一次。

Step 分片化,需要了解两个组件:分片器(Partitioner)和分片处理(PartitionHandler)。

  • 分片器(Partitioner):为每个 Slave 服务配置上下文(StepExecutionContext);

  • 分片处理(PartitionHandler):定义 Slave 服务的数量以及 Slave 任务内容;

比如在一个数据迁移 Step 中,分片处理就是将 1 个主任务拆分成 100 个从任务,并定义从任务的执行内容;分片器就是依次为这 100 个从任务划定数据迁移的范围(select * from table where id between ? and ?)。

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class PartitionTransferStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "masterTransferStudentStep1")
private Step masterTransferStudentStep;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("partitionTransferStudentJob")
.incrementer(new RunIdIncrementer())
.flow(masterTransferStudentStep)
.end()
.build();
}
}
3.2 Step 配置

MasterTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.example.springbatchdemo.component.partitioner.TransferStudentPartitioner;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class MasterTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "transferStudentPartitionHandler1")
private PartitionHandler transferStudentPartitionHandler;

@Autowired
private TransferStudentPartitioner transferStudentPartitioner;

@Bean("masterTransferStudentStep1")
public Step masterTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("masterTransferStudentStep1.manager")
.partitioner("masterTransferStudentStep1", transferStudentPartitioner)
.partitionHandler(transferStudentPartitionHandler)
.build();
}
}

SlaveTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.processor.SlaveStudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class SlaveTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "slaveTransferStudentItemReader")
private JdbcPagingItemReader<Student> slaveTransferStudentItemReader;

@Autowired
@Qualifier(value = "slaveTransferStudentItemWriter")
private JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter;

@Autowired
private SlaveStudentItemProcessor slaveStudentItemProcessor;


@Bean("slaveTransferStudentStep1")
public Step slaveTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("slaveTransferStudentStep1")
.transactionManager(transactionManager)
.<Student, Student>chunk(1000)
.reader(slaveTransferStudentItemReader)
.processor(slaveStudentItemProcessor)
.writer(slaveTransferStudentItemWriter)
.build();
}
}
3.3 Partitioner 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TransferStudentPartitioner implements Partitioner {

private static final Logger LOGGER = LoggerFactory.getLogger(TransferStudentPartitioner.class);

@Override
public Map<String, ExecutionContext> partition(int gridSize) {

Map<String, ExecutionContext> result = new HashMap<>(gridSize);

int range = 1000;
int fromId = 0;
int toId = range;

for (int i = 1; i <= gridSize; i++) {

ExecutionContext value = new ExecutionContext();

value.putInt("fromId", fromId);
value.putInt("toId", toId);

result.put("partition" + i, value);

fromId = toId;
toId += range;

LOGGER.info("partition{}; fromId: {}; toId: {}", i, fromId, toId);
}

return result;
}
}
3.4 Partition-Handler 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Step;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class TransferStudentPartitionHandler {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "slaveTransferStudentStep1")
private Step slaveTransferStudentStep;

@Bean("transferStudentPartitionHandler1")
public PartitionHandler transferStudentPartitionHandler1() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor);
retVal.setStep(slaveTransferStudentStep);
retVal.setGridSize(100);
return retVal;
}
}
3.5 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemReader")
@StepScope
public JdbcPagingItemReader<Student> slaveTransferStudentItemReader(@Value("#{stepExecutionContext[fromId]}") final Long fromId,
@Value("#{stepExecutionContext[toId]}") final Long toId) {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");
queryProvider.setWhereClause(String.format("where student_id > %s and student_id <= %s", fromId, toId));

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.6 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
@StepScope
public class SlaveStudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.7 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemWriter")
@StepScope
public JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 常规 Step

省略测试代码,具体请查看 demo。测试结果:

耗时:13s

4.2 分片化 Step

测试结果:

batch_step_execution 可以看出,共有 100 个子任务并行处理,每个子任务迁移 1000 条数据。

耗时:7s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、并行化 Step

一个 Job 可配置多个 StepStep 之间可能存在关联,需要有先有后;也可能没有关联,先执行哪一个都可以。那么,若将这些互不关联的 Step 进行并行化处理,将会有效提升批处理性能。

比如,现有一个批处理任务,包含 4 个 Step

  • step1:在学生姓名后面追加字符串 “1”;
  • step2:在学生姓名后面追加字符串 “2”;
  • step3:在学生住址后面追加字符串 “8”;
  • step4:迁移所有学生信息;

我们发现,修改学生姓名的任务与修改学生住址的任务,互不干扰,并不需要有先后之分。因此,我们可以将 step1step2step3 并行执行。串行 Step 与并行 Step 流程如下:

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchManageStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentSplitFlow1")
private Flow batchProcessStudentSplitFlow;

@Autowired
@Qualifier(value = "batchTransferStudentStep1")
private Step batchTransferStudentStep;

@Bean
public Job manageStudentJob() {
return jobBuilderFactory.get("manageStudentJob1")
.incrementer(new RunIdIncrementer())
// 姓名追加1、姓名追加2、地址追加8
.start(batchProcessStudentSplitFlow)
// 迁移学生信息; student_source -> student_target
.next(batchTransferStudentStep)
.end()
.build();
}
}
3.2 Fow 配置

batchProcessStudentSplitFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentSplitFlow {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "batchUpdateStudentNameOneAndTwoFlow")
private Flow batchUpdateStudentNameOneAndTwoFlow;

@Autowired
@Qualifier(value = "batchUpdateStudentAddressFlow1")
private Flow batchUpdateStudentAddressFlow;

@Bean("batchProcessStudentSplitFlow1")
public Flow batchProcessStudentSplitFlow1() {
return new FlowBuilder<SimpleFlow>("batchProcessStudentSplitFlow1")
.split(taskExecutor)
.add(batchUpdateStudentNameOneAndTwoFlow, batchUpdateStudentAddressFlow)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep1")
private Step batchUpdateStudentNameStep1;

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep2")
private Step batchUpdateStudentNameStep2;

@Bean("batchUpdateStudentNameOneAndTwoFlow")
public Flow updateStudentNameOneAndTwoFlow() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentNameOneAndTwoFlow")
.start(batchUpdateStudentNameStep1)
.next(batchUpdateStudentNameStep2)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentAddressStep1")
private Step batchUpdateStudentAddressStep;

@Bean("batchUpdateStudentAddressFlow1")
public Flow batchUpdateStudentAddressFlow1() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentAddressFlow1")
.start(batchUpdateStudentAddressStep)
.build();
}
}
3.3 Step 配置

BatchUpdateStudentNameStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.example.springbatchdemo.component.processor.AppendStudentNameOneProcessor;
import com.example.springbatchdemo.component.processor.AppendStudentNameTwoProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateName")
private JdbcBatchItemWriter<Student> studentItemUpdateName;

@Autowired
private AppendStudentNameOneProcessor appendStudentNameOneProcessor;

@Autowired
private AppendStudentNameTwoProcessor appendStudentNameTwoProcessor;

@Bean("batchUpdateStudentNameStep1")
public Step batchUpdateStudentNameStep1() {
return stepBuilderFactory.get("batchUpdateStudentNameStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 1
.processor(appendStudentNameOneProcessor)
.writer(studentItemUpdateName)
.build();
}

@Bean("batchUpdateStudentNameStep2")
public Step batchUpdateStudentNameStep2() {
return stepBuilderFactory.get("batchUpdateStudentNameStep2")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 2
.processor(appendStudentNameTwoProcessor)
.writer(studentItemUpdateName)
.build();
}
}

BatchUpdateStudentAddressStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.AppendStudentAddressProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateAddress")
private JdbcBatchItemWriter<Student> studentItemUpdateAddress;

@Autowired
private AppendStudentAddressProcessor appendStudentAddressProcessor;

@Bean("batchUpdateStudentAddressStep1")
public Step batchUpdateStudentAddressStep1() {
return stepBuilderFactory.get("batchUpdateStudentAddressStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 住址追加 8
.processor(appendStudentAddressProcessor)
.writer(studentItemUpdateAddress)
.build();
}
}

BatchProcessStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchTransferStudentStep1")
public Step batchTransferStudentStep1() {
return stepBuilderFactory.get("batchTransferStudentStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 迁移数据; student_source -> student_target
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.4 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
@StepScope
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.5 数据处理器

AppendStudentNameOneProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameOneProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameOneProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_1"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentNameTwoProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameTwoProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameTwoProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_2"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentAddressProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentAddressProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentAddressProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address.concat("_8"));

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

StudentItemProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.6 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateName")
@StepScope
public JdbcBatchItemWriter<Student> studentItemUpdateName() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET name = :name WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateAddress")
public JdbcBatchItemWriter<Student> studentItemUpdateAddress() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET address = :address WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}
}

@StepScope:

从上面的 Step 配置可知,studentItemReader 被多个 Step 引用。默认情况下 studentItemReader 的生命周期是与 Job 保持一致,那么在多 Step 引用的情况下,就会抛出类似下面这种异常:

1
>Caused by: java.lang.IllegalStateException: Cannot open an already opened ItemReader, call close first

使用注解 StepScope,让 studentItemReader 的生命周期与 Step 保持同步,保证每个 Step 拿到的 ItemReader 都是新的实例。同样,ItemWriterItemProcessor 存在多 Step 引用的,都要使用该注解。

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 串行 Step

串行 Step 批处理,只需要按照顺序配置 Step(省略代码示例)。测试结果:

耗时:91s

4.2 并行 Step

测试结果:

耗时:68s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、多线程 Step 配置

Spring Batch 执行一个 Step,会按照 chunk 配置的数量分批次提交。对于多线程 Step,由线程池去处理任务批次。因此,每个 chunk 都不用串行等待,这大大地提高了批处理性能。

配置多线程 Step 非常简单,可以通过 xml 或接口来配置。以接口配置为例:

1
2
3
4
5
6
7
8
9
10
11
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
// 节流配置, 不要超过线程池的最大线程数量
.throttleLimit(20)
.build();
}

此外,在配置多线程 Step 时,我们需要考虑得更多:

  • 线程池:推荐使用 Spring 线程池 ThreadPoolTaskExecutor,兼容性好;
  • 线程安全:输入器和输出器必须是线程安全的,否则可能会导致重复任务、脏数据等问题;
  • 框架节流:Spring Batch 自带节流器,默认最多可处理 4 个小任务,因此需要重新配置;

三、批处理配置

通过 Spring Batch 应用,迁移 100 万条数据。相关配置如下:

3.1 数据读取器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.2 数据映射器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import com.example.springbatchdemo.entity.Student;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;

public class StudentRowMapper implements RowMapper<Student> {

@Override
public Student mapRow(ResultSet rs, int rowNum) throws SQLException {
Student student = new Student();
student.setStudentId(rs.getLong("student_id"));
student.setName(rs.getString("name"));
student.setAddress(rs.getString("address"));
return student;
}
}
3.3 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.4 数据写入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}
3.5 Step 配置-单线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.6 Step 配置-多线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.taskExecutor(taskExecutor)
.throttleLimit(30)
.build();
}
}
3.7 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.example.springbatchdemo.component.listener.BatchProcessStudentCompletionListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchProcessStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentStep1")
private Step batchProcessStudentStep1;

@Autowired
private BatchProcessStudentCompletionListener batchProcessStudentCompletionListener;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("transferStudentJob")
.incrementer(new RunIdIncrementer())
.listener(batchProcessStudentCompletionListener)
.flow(batchProcessStudentStep1)
.end()
.build();
}
}
3.8 MySQL 数据源配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

@Primary
@Bean(name = "batchDemoDB")
// 数据源配置参数识别前缀, 根据具体配置来设定
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
// 使用 SpringBoot 默认的数据源 HikariDataSource
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
}
3.9 线程池配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class ExecutorConfig {

public static final String TASK_EXECUTOR = "taskExecutor";

@Bean(TASK_EXECUTOR)
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("common-async-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}

四、批处理性能测试

4.1 单线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:313 秒

4.2 多线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:81 秒


性能提升超300%

五、总结

多线程 Stepchunk 任务交给线程池异步执行,可以显著地提升批处理的性能。但在多线程场景下,我们要了解 Spring Batch 的基础架构,避免并发导致的重复任务、脏数据等问题。

示例代码:spring-batch-demo


一、Spring Batch 数据输出器

Spring Batch 的数据输出器,是通过接口 ItemWriter 来实现的。针对常用的数据输出场景,Spring Batch 提供了丰富的组件支持(查看所有组件),本文介绍最常用的五个组件:

  • FlatFileItemWriter:输出文本数据;
  • JdbcBatchItemWriter:输出数据到数据库;
  • StaxEventItemWriter:输出 XML 文件数据;
  • JsonFileItemWriter:输出 JSON 文件数据;
  • ClassifierCompositeItemWriter:输出多文本数据;

二、简单使用

实体类 Ticket.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}

文件 ticket.csv

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00
2.1 FlatFileItemWriter-文本数据输出

ticket.csv 中的信息,转换为 JSON 字符串,输出到文件 ticket_output.txt 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Job
*/
@Bean
public Job testFlatFileItemWriterJob() {
return jobBuilderFactory.get("testFlatFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testFlatFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testFlatFileItemWriterStep")
public Step testFlatFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testFlatFileItemWriterStep")
.transactionManager(transactionManager)
.reader(ticketFileItemReader)
.writer(ticketFileItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

输出文本数据,要求目标数据的格式为字符串,因此需要将 POJO 按照一定规则聚合成字符串。Spring Batch 已实现聚合器 LineAggregatorPassThroughLineAggregator(打印 POJO)、RecursiveCollectionLineAggregator(打印 POJO 列表)、DelimitedLineAggregator(分隔符拼接 POJO 字段值)、FormatterLineAggregator(格式化 POJO 字段值)。当然,我们也可以手动实现聚合器,例如示例代码中,将 POJO 转换为 JSON 格式。

启动应用,生成文件 ticket_output.txt

1
2
3
4
5
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00}
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00}
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
2.2 JdbcBatchItemWriter-数据库数据输出

将文件 student.cvs 中的信息(内容如下),导入到 MySQL 数据表 student 中:

1
2
3
1,张三,合肥
2,李四,蚌埠
3,王二,南京
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Job
*/
@Bean
public Job testDatabaseItemWriterJob() {
return jobBuilderFactory.get("testDatabaseItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testDatabaseItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testDatabaseItemWriterStep")
public Step testDatabaseItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testDatabaseItemWriterStep")
.transactionManager(transactionManager)
.<Student, Student>chunk(10)
.reader(studentFileItemReader)
.writer(studentItemWriter)
.build();
}

/**
* Reader
*/
@Bean("studentFileItemReader")
public FlatFileItemReader<Student> studentFileItemReader() {
return new FlatFileItemReaderBuilder<Student>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("student.csv"))
.delimited()
.names(new String[]{"studentId", "name", "address"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {{
setTargetType(Student.class);
}})
.build();
}

/**
* Writer
*/
@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {
return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

/**
* MySQL 数据源配置
*/
@Primary
@Bean(name = "batchDemoDB")
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

启动应用,文件中的数据已导入表 student

2.3 StaxEventItemWriter-XML 文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.xml 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Job
*/
@Bean
public Job testXmlItemWriterJob() {
return jobBuilderFactory.get("testXmlItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testXmlItemWriterStep)
.build();
}

/**
* Step
*/
@Bean("testXmlItemWriterStep")
public Step testXmlItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testXmlItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketXmlItemWriter)
.build();
}


/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件 ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>蚌埠</arrivalStation><price>220.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>杭州</arrivalStation><price>75.20</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>昆山</arrivalStation><price>19.00</price></ticket></tickets>
2.4 JsonFileItemWriter-JSON文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.json 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Job
*/
@Bean
public Job testJsonItemWriterJob() {
return jobBuilderFactory.get("testJsonItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testJsonItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testJsonItemWriterStep")
public Step testJsonItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testJsonItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketJsonItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketJsonItemWriter")
public JsonFileItemWriter<Ticket> ticketJsonItemWriter() {
return new JsonFileItemWriterBuilder<Ticket>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new FileSystemResource("ticket_output.json"))
.name("ticketJsonItemWriter")
.build();
}

启动应用,生成文件 ticket_output.json

1
2
3
4
5
6
7
[
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00},
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00},
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00},
{"departureStation":"上海","arrivalStation":"杭州","price":75.20},
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
]
2.5 ClassifierCompositeItemWriter-输出多文本数据

将文件 ticket.csv 中始发站为上海的车票信息输出到文本中,其余的输出到 XML 文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* Job
*/
@Bean
public Job testMultiFileItemWriterJob() {
return jobBuilderFactory.get("testMultiFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testMultiFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testMultiFileItemWriterStep")
public Step testMultiFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testMultiFileItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketClassifierMultiFileItemWriter)
.stream(ticketFileItemWriter)
.stream(ticketXmlItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Classifier Writer
*/
@Bean("ticketClassifierMultiFileItemWriter")
public ClassifierCompositeItemWriter<Ticket> ticketClassifierMultiFileItemWriter() {
ClassifierCompositeItemWriter<Ticket> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier((Classifier<Ticket, ItemWriter<? super Ticket>>) ticket -> {
// 始发站是上海的, 输出到文本中, 否则输出到 XML 文件中
return "上海".equals(ticket.getDepartureStation()) ? ticketFileItemWriter() : ticketXmlItemWriter();
});
return writer;
}

/**
* 文本-Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

/**
* XML-Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件如下:

ticket_output.txt

1
2
3
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}

ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket></tickets>

示例代码:spring-batch-demo