feat:redis 原子方案关键问题优化
This commit is contained in:
@@ -401,7 +401,422 @@ public class InventoryChangeMessage implements Serializable {
|
||||
|
||||
## 数据一致性保证
|
||||
|
||||
### Redis 与 MySQL 数据同步
|
||||
### 问题分析
|
||||
|
||||
**问题 1:消息乱序导致库存混乱**
|
||||
|
||||
消息队列无法保证严格的时序性,可能出现:
|
||||
```
|
||||
时间线:
|
||||
T1: 锁定库存 -10 (发送消息 M1)
|
||||
T2: 释放库存 +10 (发送消息 M2)
|
||||
|
||||
消费顺序可能是:
|
||||
M2 先到达 → +10 (错误:此时还未扣减)
|
||||
M1 后到达 → -10
|
||||
```
|
||||
|
||||
**问题 2:库存查询数据源不一致**
|
||||
|
||||
- Redis 是实时数据(最准确)
|
||||
- MySQL 是异步同步数据(有延迟)
|
||||
- 查询时应该以哪个为准?
|
||||
|
||||
### 解决方案 1:消息时序保证
|
||||
|
||||
#### 方案 A:使用顺序消息(推荐)
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class OrderedInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 发送顺序消息(按 inventoryCode 分区)
|
||||
*/
|
||||
public Result<Boolean> lockInventory(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// 执行 Lua 脚本扣减库存
|
||||
Long result = redisTemplate.execute(...);
|
||||
|
||||
if (result < 0) {
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
// 发送顺序消息(使用 inventoryCode 作为 sharding key)
|
||||
InventoryChangeMessage message = buildMessage(dto);
|
||||
|
||||
rocketMQTemplate.syncSendOrderly(
|
||||
"inventory-change-topic",
|
||||
message,
|
||||
dto.getInventoryCode() // sharding key,保证同一库存的消息顺序
|
||||
);
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 方案 B:消息携带版本号
|
||||
|
||||
```java
|
||||
@Data
|
||||
public class InventoryChangeMessage implements Serializable {
|
||||
|
||||
private String inventoryCode;
|
||||
private String operationType;
|
||||
private Integer quantity;
|
||||
private String orderNo;
|
||||
private Long timestamp;
|
||||
|
||||
// 新增:版本号,用于检测乱序
|
||||
private Long version;
|
||||
|
||||
// 新增:操作序列号
|
||||
private Long sequenceNo;
|
||||
}
|
||||
|
||||
@Service
|
||||
public class VersionedInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
private final AtomicLong sequenceGenerator = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* 锁定库存(带版本号)
|
||||
*/
|
||||
public Result<Boolean> lockInventory(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
String versionKey = key + ":version";
|
||||
|
||||
// 获取当前版本号
|
||||
Long currentVersion = redisTemplate.opsForValue().increment(versionKey);
|
||||
|
||||
// 执行库存扣减
|
||||
Long result = deductInventoryWithLua(key, dto.getQuantity());
|
||||
|
||||
if (result < 0) {
|
||||
// 回滚版本号
|
||||
redisTemplate.opsForValue().decrement(versionKey);
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
// 发送消息(携带版本号)
|
||||
InventoryChangeMessage message = new InventoryChangeMessage();
|
||||
message.setInventoryCode(dto.getInventoryCode());
|
||||
message.setOperationType("LOCK");
|
||||
message.setQuantity(dto.getQuantity());
|
||||
message.setOrderNo(dto.getOrderNo());
|
||||
message.setVersion(currentVersion);
|
||||
message.setSequenceNo(sequenceGenerator.incrementAndGet());
|
||||
message.setTimestamp(System.currentTimeMillis());
|
||||
|
||||
rocketMQTemplate.asyncSend("inventory-change-topic", message, null);
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 方案 C:消费者端排序(最可靠)
|
||||
|
||||
```java
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "inventory-change-topic",
|
||||
consumerGroup = "inventory-sync-consumer",
|
||||
consumeMode = ConsumeMode.ORDERLY // 顺序消费
|
||||
)
|
||||
public class OrderedInventorySyncConsumer implements RocketMQListener<InventoryChangeMessage> {
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
// 每个库存维护一个消息队列
|
||||
private final Map<String, PriorityBlockingQueue<InventoryChangeMessage>> messageQueues
|
||||
= new ConcurrentHashMap<>();
|
||||
|
||||
// 每个库存维护已处理的最大版本号
|
||||
private final Map<String, Long> processedVersions = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void onMessage(InventoryChangeMessage message) {
|
||||
String inventoryCode = message.getInventoryCode();
|
||||
|
||||
// 获取该库存的消息队列
|
||||
PriorityBlockingQueue<InventoryChangeMessage> queue = messageQueues
|
||||
.computeIfAbsent(inventoryCode, k -> new PriorityBlockingQueue<>(
|
||||
100,
|
||||
Comparator.comparing(InventoryChangeMessage::getVersion)
|
||||
));
|
||||
|
||||
// 加入队列
|
||||
queue.offer(message);
|
||||
|
||||
// 处理队列中的消息
|
||||
processMessageQueue(inventoryCode, queue);
|
||||
}
|
||||
|
||||
private void processMessageQueue(String inventoryCode,
|
||||
PriorityBlockingQueue<InventoryChangeMessage> queue) {
|
||||
Long lastProcessedVersion = processedVersions.getOrDefault(inventoryCode, 0L);
|
||||
|
||||
while (!queue.isEmpty()) {
|
||||
InventoryChangeMessage message = queue.peek();
|
||||
|
||||
// 检查版本号是否连续
|
||||
if (message.getVersion() != lastProcessedVersion + 1) {
|
||||
// 版本号不连续,等待前面的消息
|
||||
LogUtil.warn("消息版本号不连续,等待: inventoryCode={}, expected={}, actual={}",
|
||||
inventoryCode, lastProcessedVersion + 1, message.getVersion());
|
||||
break;
|
||||
}
|
||||
|
||||
// 版本号连续,处理消息
|
||||
queue.poll();
|
||||
|
||||
try {
|
||||
processSingleMessage(message);
|
||||
|
||||
// 更新已处理版本号
|
||||
processedVersions.put(inventoryCode, message.getVersion());
|
||||
lastProcessedVersion = message.getVersion();
|
||||
|
||||
} catch (Exception e) {
|
||||
LogUtil.error("消息处理失败: {}", message, e);
|
||||
// 重新加入队列
|
||||
queue.offer(message);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processSingleMessage(InventoryChangeMessage message) {
|
||||
// 查询库存
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(
|
||||
message.getInventoryCode()
|
||||
);
|
||||
|
||||
if (inventory == null) {
|
||||
LogUtil.error("库存不存在: {}", message.getInventoryCode());
|
||||
return;
|
||||
}
|
||||
|
||||
// 根据操作类型更新数据库
|
||||
switch (message.getOperationType()) {
|
||||
case "LOCK":
|
||||
inventory.setRemainingStock(
|
||||
inventory.getRemainingStock() - message.getQuantity()
|
||||
);
|
||||
break;
|
||||
|
||||
case "DEDUCT":
|
||||
inventory.setSoldStock(
|
||||
inventory.getSoldStock() + message.getQuantity()
|
||||
);
|
||||
break;
|
||||
|
||||
case "RELEASE":
|
||||
inventory.setRemainingStock(
|
||||
inventory.getRemainingStock() + message.getQuantity()
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
inventory.setUpdateTime(LocalDateTime.now());
|
||||
inventoryRepository.save(inventory);
|
||||
|
||||
LogUtil.info("库存同步成功: inventoryCode={}, version={}, operation={}",
|
||||
message.getInventoryCode(), message.getVersion(), message.getOperationType());
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 解决方案 2:库存查询数据源
|
||||
|
||||
#### 统一查询接口(以 Redis 为准)
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class UnifiedInventoryQueryService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
/**
|
||||
* 查询库存(统一从 Redis 查询)
|
||||
*/
|
||||
public InventoryVO getInventory(String inventoryCode) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
|
||||
// 优先从 Redis 查询
|
||||
Integer redisStock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (redisStock != null) {
|
||||
// Redis 中有数据,直接返回
|
||||
InventoryVO vo = new InventoryVO();
|
||||
vo.setInventoryCode(inventoryCode);
|
||||
vo.setRemainingStock(redisStock);
|
||||
vo.setDataSource("REDIS");
|
||||
return vo;
|
||||
}
|
||||
|
||||
// Redis 中没有,从数据库加载
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(inventoryCode);
|
||||
|
||||
if (inventory == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 加载到 Redis
|
||||
redisTemplate.opsForValue().set(key, inventory.getRemainingStock(), 1, TimeUnit.HOURS);
|
||||
|
||||
InventoryVO vo = new InventoryVO();
|
||||
vo.setInventoryCode(inventoryCode);
|
||||
vo.setRemainingStock(inventory.getRemainingStock());
|
||||
vo.setDataSource("MYSQL");
|
||||
|
||||
return vo;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量查询库存
|
||||
*/
|
||||
public Map<String, Integer> batchGetInventory(List<String> inventoryCodes) {
|
||||
Map<String, Integer> result = new HashMap<>();
|
||||
|
||||
// 批量从 Redis 查询
|
||||
List<String> keys = inventoryCodes.stream()
|
||||
.map(code -> INVENTORY_KEY_PREFIX + code)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<Object> values = redisTemplate.opsForValue().multiGet(keys);
|
||||
|
||||
// 收集 Redis 中不存在的库存编码
|
||||
List<String> missingCodes = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < inventoryCodes.size(); i++) {
|
||||
String code = inventoryCodes.get(i);
|
||||
Object value = values.get(i);
|
||||
|
||||
if (value != null) {
|
||||
result.put(code, (Integer) value);
|
||||
} else {
|
||||
missingCodes.add(code);
|
||||
}
|
||||
}
|
||||
|
||||
// 从数据库加载缺失的库存
|
||||
if (!missingCodes.isEmpty()) {
|
||||
List<InventoryDO> inventories = inventoryRepository
|
||||
.findByInventoryCodeIn(missingCodes);
|
||||
|
||||
for (InventoryDO inventory : inventories) {
|
||||
result.put(inventory.getInventoryCode(), inventory.getRemainingStock());
|
||||
|
||||
// 加载到 Redis
|
||||
String key = INVENTORY_KEY_PREFIX + inventory.getInventoryCode();
|
||||
redisTemplate.opsForValue().set(
|
||||
key,
|
||||
inventory.getRemainingStock(),
|
||||
1,
|
||||
TimeUnit.HOURS
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 双写一致性保证
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class DualWriteInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 锁定库存(双写模式)
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Result<Boolean> lockInventoryWithDualWrite(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// 1. Redis 扣减(原子操作)
|
||||
Long redisResult = redisTemplate.execute(
|
||||
(RedisCallback<Long>) connection ->
|
||||
connection.evalSha(
|
||||
deductScriptSha,
|
||||
ReturnType.INTEGER,
|
||||
1,
|
||||
key.getBytes(),
|
||||
String.valueOf(dto.getQuantity()).getBytes(),
|
||||
"0".getBytes()
|
||||
)
|
||||
);
|
||||
|
||||
if (redisResult == null || redisResult < 0) {
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
try {
|
||||
// 2. 数据库扣减(同步,保证强一致性)
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(
|
||||
dto.getInventoryCode()
|
||||
);
|
||||
|
||||
if (inventory == null) {
|
||||
// 回滚 Redis
|
||||
redisTemplate.opsForValue().increment(key, dto.getQuantity());
|
||||
return Result.fail("INVENTORY_NOT_FOUND", "库存不存在");
|
||||
}
|
||||
|
||||
inventory.setRemainingStock(
|
||||
inventory.getRemainingStock() - dto.getQuantity()
|
||||
);
|
||||
inventory.setUpdateTime(LocalDateTime.now());
|
||||
|
||||
boolean success = inventoryRepository.updateWithVersion(inventory);
|
||||
|
||||
if (!success) {
|
||||
// 数据库更新失败,回滚 Redis
|
||||
redisTemplate.opsForValue().increment(key, dto.getQuantity());
|
||||
return Result.fail("INVENTORY_LOCK_FAILED", "库存锁定失败");
|
||||
}
|
||||
|
||||
// 3. 记录日志
|
||||
saveInventoryLog(dto);
|
||||
|
||||
return Result.success(true);
|
||||
|
||||
} catch (Exception e) {
|
||||
// 异常时回滚 Redis
|
||||
redisTemplate.opsForValue().increment(key, dto.getQuantity());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Redis 与 MySQL 数据对账
|
||||
|
||||
```java
|
||||
@Service
|
||||
@@ -439,13 +854,17 @@ public class InventoryConsistencyService {
|
||||
inventory.getInventoryCode(), redisStock,
|
||||
inventory.getRemainingStock());
|
||||
|
||||
// 以 MySQL 为准,修正 Redis
|
||||
redisTemplate.opsForValue().set(
|
||||
key,
|
||||
inventory.getRemainingStock(),
|
||||
1,
|
||||
TimeUnit.HOURS
|
||||
);
|
||||
// 以 Redis 为准,修正 MySQL(因为 Redis 是实时数据)
|
||||
inventory.setRemainingStock(redisStock);
|
||||
inventoryRepository.save(inventory);
|
||||
|
||||
// 发送告警
|
||||
alertService.sendAlert(String.format(
|
||||
"库存数据不一致已修复: %s, Redis=%d, MySQL=%d",
|
||||
inventory.getInventoryCode(),
|
||||
redisStock,
|
||||
inventory.getRemainingStock()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -454,6 +873,121 @@ public class InventoryConsistencyService {
|
||||
}
|
||||
```
|
||||
|
||||
## 方案对比:异步 vs 同步
|
||||
|
||||
### 异步方案(原方案)
|
||||
|
||||
**优点**
|
||||
- 性能极高(QPS 50000+)
|
||||
- 响应时间短(2-5ms)
|
||||
- 削峰填谷
|
||||
|
||||
**缺点**
|
||||
- 消息可能乱序
|
||||
- 数据最终一致(有延迟)
|
||||
- 实现复杂
|
||||
|
||||
### 同步方案(双写)
|
||||
|
||||
**优点**
|
||||
- 强一致性
|
||||
- 无消息乱序问题
|
||||
- 实现相对简单
|
||||
|
||||
**缺点**
|
||||
- 性能较低(QPS ~5000)
|
||||
- 响应时间较长(10-20ms)
|
||||
- 数据库压力大
|
||||
|
||||
### 推荐方案:混合模式
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class HybridInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 锁定库存(混合模式)
|
||||
* - Redis 扣减(实时)
|
||||
* - 异步同步 MySQL(最终一致)
|
||||
* - 查询优先 Redis(保证准确性)
|
||||
*/
|
||||
public Result<Boolean> lockInventory(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
String versionKey = key + ":version";
|
||||
|
||||
// 1. 获取版本号(保证消息顺序)
|
||||
Long version = redisTemplate.opsForValue().increment(versionKey);
|
||||
|
||||
// 2. Redis 原子扣减
|
||||
Long result = redisTemplate.execute(
|
||||
(RedisCallback<Long>) connection ->
|
||||
connection.evalSha(
|
||||
deductScriptSha,
|
||||
ReturnType.INTEGER,
|
||||
1,
|
||||
key.getBytes(),
|
||||
String.valueOf(dto.getQuantity()).getBytes(),
|
||||
"0".getBytes()
|
||||
)
|
||||
);
|
||||
|
||||
if (result == null || result < 0) {
|
||||
// 回滚版本号
|
||||
redisTemplate.opsForValue().decrement(versionKey);
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
// 3. 发送顺序消息(携带版本号)
|
||||
InventoryChangeMessage message = new InventoryChangeMessage();
|
||||
message.setInventoryCode(dto.getInventoryCode());
|
||||
message.setOperationType("LOCK");
|
||||
message.setQuantity(dto.getQuantity());
|
||||
message.setOrderNo(dto.getOrderNo());
|
||||
message.setVersion(version); // 关键:版本号
|
||||
message.setTimestamp(System.currentTimeMillis());
|
||||
|
||||
// 使用 inventoryCode 作为 sharding key,保证顺序
|
||||
rocketMQTemplate.syncSendOrderly(
|
||||
"inventory-change-topic",
|
||||
message,
|
||||
dto.getInventoryCode()
|
||||
);
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询库存(统一从 Redis 查询)
|
||||
*/
|
||||
public Integer getInventoryStock(String inventoryCode) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
|
||||
Integer stock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (stock == null) {
|
||||
// Redis 中没有,从数据库加载
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(inventoryCode);
|
||||
if (inventory != null) {
|
||||
stock = inventory.getRemainingStock();
|
||||
// 加载到 Redis
|
||||
redisTemplate.opsForValue().set(key, stock, 1, TimeUnit.HOURS);
|
||||
}
|
||||
}
|
||||
|
||||
return stock;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 性能优化
|
||||
|
||||
### 连接池配置
|
||||
|
||||
692
docs/高并发库存方案/Redis方案深度分析.md
Normal file
692
docs/高并发库存方案/Redis方案深度分析.md
Normal file
@@ -0,0 +1,692 @@
|
||||
# Redis 方案深度分析
|
||||
|
||||
## 核心问题
|
||||
|
||||
### 问题 1:消息乱序导致库存混乱
|
||||
|
||||
#### 问题描述
|
||||
|
||||
在异步消息同步模式下,消息的发送顺序和消费顺序可能不一致:
|
||||
|
||||
```
|
||||
时间线:
|
||||
T1: 用户下单,锁定库存 -10 → 发送消息 M1
|
||||
T2: 用户取消,释放库存 +10 → 发送消息 M2
|
||||
|
||||
理想消费顺序:
|
||||
M1 先消费 → MySQL 库存 100 - 10 = 90
|
||||
M2 后消费 → MySQL 库存 90 + 10 = 100
|
||||
|
||||
实际可能的消费顺序:
|
||||
M2 先消费 → MySQL 库存 100 + 10 = 110 ❌ 错误!
|
||||
M1 后消费 → MySQL 库存 110 - 10 = 100
|
||||
|
||||
结果:虽然最终库存正确,但中间状态错误,可能导致超卖
|
||||
```
|
||||
|
||||
#### 根本原因
|
||||
|
||||
- 网络延迟不同
|
||||
- 消息队列分区策略
|
||||
- 消费者并发处理
|
||||
- 消息重试机制
|
||||
|
||||
### 问题 2:库存查询数据源不一致
|
||||
|
||||
#### 问题描述
|
||||
|
||||
系统中存在两个数据源:
|
||||
|
||||
```
|
||||
Redis(实时数据)
|
||||
- 库存:95
|
||||
- 更新时间:T1
|
||||
|
||||
MySQL(异步同步)
|
||||
- 库存:100
|
||||
- 更新时间:T0(延迟 500ms)
|
||||
|
||||
用户查询库存时,应该返回哪个值?
|
||||
```
|
||||
|
||||
#### 影响范围
|
||||
|
||||
- 库存查询接口
|
||||
- 订单创建前的库存检查
|
||||
- 库存监控和报表
|
||||
- 库存预警
|
||||
|
||||
## 解决方案对比
|
||||
|
||||
### 方案 A:顺序消息 + Redis 查询(推荐)
|
||||
|
||||
#### 架构设计
|
||||
|
||||
```
|
||||
┌─────────┐ ┌─────────┐ ┌──────────┐ ┌─────────┐
|
||||
│ 业务请求 │ ───> │ Redis │ ───> │ 顺序消息 │ ───> │ MySQL │
|
||||
└─────────┘ └─────────┘ └──────────┘ └─────────┘
|
||||
原子操作 保证顺序 异步同步
|
||||
|
||||
所有查询 ───> Redis(实时数据)
|
||||
```
|
||||
|
||||
#### 核心实现
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class OrderedInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
private static final String INVENTORY_KEY_PREFIX = "inventory:";
|
||||
private static final String VERSION_KEY_PREFIX = "inventory:version:";
|
||||
|
||||
/**
|
||||
* 锁定库存
|
||||
*/
|
||||
public Result<Boolean> lockInventory(InventoryOperationDTO dto) {
|
||||
String inventoryKey = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
String versionKey = VERSION_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// 1. 生成版本号(保证消息顺序)
|
||||
Long version = redisTemplate.opsForValue().increment(versionKey);
|
||||
|
||||
// 2. Redis 原子扣减
|
||||
Long remainingStock = deductInventoryWithLua(inventoryKey, dto.getQuantity());
|
||||
|
||||
if (remainingStock < 0) {
|
||||
// 扣减失败,回滚版本号
|
||||
redisTemplate.opsForValue().decrement(versionKey);
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
// 3. 发送顺序消息
|
||||
InventoryChangeMessage message = new InventoryChangeMessage();
|
||||
message.setInventoryCode(dto.getInventoryCode());
|
||||
message.setOperationType("LOCK");
|
||||
message.setQuantity(dto.getQuantity());
|
||||
message.setOrderNo(dto.getOrderNo());
|
||||
message.setVersion(version); // 携带版本号
|
||||
message.setTimestamp(System.currentTimeMillis());
|
||||
|
||||
// 使用 inventoryCode 作为 sharding key,保证同一库存的消息顺序
|
||||
rocketMQTemplate.syncSendOrderly(
|
||||
"inventory-change-topic",
|
||||
message,
|
||||
dto.getInventoryCode() // sharding key
|
||||
);
|
||||
|
||||
LogUtil.info("库存锁定成功: inventoryCode={}, version={}, remaining={}",
|
||||
dto.getInventoryCode(), version, remainingStock);
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放库存
|
||||
*/
|
||||
public Result<Boolean> releaseInventory(InventoryOperationDTO dto) {
|
||||
String inventoryKey = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
String versionKey = VERSION_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// 1. 生成版本号
|
||||
Long version = redisTemplate.opsForValue().increment(versionKey);
|
||||
|
||||
// 2. Redis 增加库存
|
||||
Long newStock = redisTemplate.opsForValue()
|
||||
.increment(inventoryKey, dto.getQuantity());
|
||||
|
||||
// 3. 发送顺序消息
|
||||
InventoryChangeMessage message = new InventoryChangeMessage();
|
||||
message.setInventoryCode(dto.getInventoryCode());
|
||||
message.setOperationType("RELEASE");
|
||||
message.setQuantity(dto.getQuantity());
|
||||
message.setOrderNo(dto.getOrderNo());
|
||||
message.setVersion(version);
|
||||
message.setTimestamp(System.currentTimeMillis());
|
||||
|
||||
rocketMQTemplate.syncSendOrderly(
|
||||
"inventory-change-topic",
|
||||
message,
|
||||
dto.getInventoryCode()
|
||||
);
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询库存(统一从 Redis 查询)
|
||||
*/
|
||||
public Integer getInventoryStock(String inventoryCode) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
|
||||
Integer stock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (stock == null) {
|
||||
// Redis 中没有,从数据库加载
|
||||
stock = loadFromDatabase(inventoryCode);
|
||||
}
|
||||
|
||||
return stock;
|
||||
}
|
||||
|
||||
private Integer loadFromDatabase(String inventoryCode) {
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(inventoryCode);
|
||||
|
||||
if (inventory == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
Integer stock = inventory.getRemainingStock();
|
||||
|
||||
// 加载到 Redis
|
||||
redisTemplate.opsForValue().set(key, stock, 1, TimeUnit.HOURS);
|
||||
|
||||
return stock;
|
||||
}
|
||||
|
||||
private Long deductInventoryWithLua(String key, Integer quantity) {
|
||||
// Lua 脚本保证原子性
|
||||
String script =
|
||||
"local stock = redis.call('GET', KEYS[1]) " +
|
||||
"if not stock then return -1 end " +
|
||||
"stock = tonumber(stock) " +
|
||||
"if stock < tonumber(ARGV[1]) then return -2 end " +
|
||||
"redis.call('DECRBY', KEYS[1], ARGV[1]) " +
|
||||
"return stock - tonumber(ARGV[1])";
|
||||
|
||||
return redisTemplate.execute(
|
||||
(RedisCallback<Long>) connection ->
|
||||
connection.eval(
|
||||
script.getBytes(),
|
||||
ReturnType.INTEGER,
|
||||
1,
|
||||
key.getBytes(),
|
||||
String.valueOf(quantity).getBytes()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 消费者实现(顺序消费 + 版本号校验)
|
||||
|
||||
```java
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "inventory-change-topic",
|
||||
consumerGroup = "inventory-sync-consumer",
|
||||
consumeMode = ConsumeMode.ORDERLY // 关键:顺序消费
|
||||
)
|
||||
public class OrderedInventorySyncConsumer implements RocketMQListener<InventoryChangeMessage> {
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
@Autowired
|
||||
private InventoryLogRepository inventoryLogRepository;
|
||||
|
||||
// 记录每个库存已处理的最大版本号
|
||||
private final Map<String, Long> processedVersions = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void onMessage(InventoryChangeMessage message) {
|
||||
String inventoryCode = message.getInventoryCode();
|
||||
Long messageVersion = message.getVersion();
|
||||
|
||||
// 获取已处理的最大版本号
|
||||
Long lastVersion = processedVersions.getOrDefault(inventoryCode, 0L);
|
||||
|
||||
// 版本号校验
|
||||
if (messageVersion <= lastVersion) {
|
||||
LogUtil.warn("重复消息或乱序消息,跳过: inventoryCode={}, messageVersion={}, lastVersion={}",
|
||||
inventoryCode, messageVersion, lastVersion);
|
||||
return;
|
||||
}
|
||||
|
||||
// 版本号不连续,告警(理论上不应该发生)
|
||||
if (messageVersion != lastVersion + 1) {
|
||||
LogUtil.error("消息版本号不连续: inventoryCode={}, expected={}, actual={}",
|
||||
inventoryCode, lastVersion + 1, messageVersion);
|
||||
|
||||
alertService.sendAlert(String.format(
|
||||
"库存消息版本号不连续: %s, expected=%d, actual=%d",
|
||||
inventoryCode, lastVersion + 1, messageVersion
|
||||
));
|
||||
}
|
||||
|
||||
try {
|
||||
// 处理消息
|
||||
processMessage(message);
|
||||
|
||||
// 更新已处理版本号
|
||||
processedVersions.put(inventoryCode, messageVersion);
|
||||
|
||||
LogUtil.info("库存同步成功: inventoryCode={}, version={}, operation={}",
|
||||
inventoryCode, messageVersion, message.getOperationType());
|
||||
|
||||
} catch (Exception e) {
|
||||
LogUtil.error("库存同步失败: {}", message, e);
|
||||
throw new RuntimeException("库存同步失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void processMessage(InventoryChangeMessage message) {
|
||||
// 查询库存
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(
|
||||
message.getInventoryCode()
|
||||
);
|
||||
|
||||
if (inventory == null) {
|
||||
LogUtil.error("库存不存在: {}", message.getInventoryCode());
|
||||
return;
|
||||
}
|
||||
|
||||
Integer beforeQty = inventory.getRemainingStock();
|
||||
Integer beforeSold = inventory.getSoldStock();
|
||||
|
||||
// 根据操作类型更新
|
||||
switch (message.getOperationType()) {
|
||||
case "LOCK":
|
||||
inventory.setRemainingStock(
|
||||
inventory.getRemainingStock() - message.getQuantity()
|
||||
);
|
||||
break;
|
||||
|
||||
case "DEDUCT":
|
||||
inventory.setSoldStock(
|
||||
inventory.getSoldStock() + message.getQuantity()
|
||||
);
|
||||
break;
|
||||
|
||||
case "RELEASE":
|
||||
inventory.setRemainingStock(
|
||||
inventory.getRemainingStock() + message.getQuantity()
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
LogUtil.error("未知操作类型: {}", message.getOperationType());
|
||||
return;
|
||||
}
|
||||
|
||||
inventory.setUpdateTime(LocalDateTime.now());
|
||||
inventoryRepository.save(inventory);
|
||||
|
||||
// 记录日志
|
||||
saveInventoryLog(message, beforeQty, beforeSold);
|
||||
}
|
||||
|
||||
private void saveInventoryLog(InventoryChangeMessage message,
|
||||
Integer beforeQty, Integer beforeSold) {
|
||||
InventoryLogDO log = new InventoryLogDO();
|
||||
log.setInventoryCode(message.getInventoryCode());
|
||||
log.setOperationType(message.getOperationType());
|
||||
log.setQuantity(message.getQuantity());
|
||||
log.setBeforeQty(beforeQty);
|
||||
log.setAfterQty(beforeQty - message.getQuantity());
|
||||
log.setOrderNo(message.getOrderNo());
|
||||
log.setVersion(message.getVersion());
|
||||
log.setCreateTime(LocalDateTime.now());
|
||||
|
||||
inventoryLogRepository.save(log);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 优点
|
||||
|
||||
- 保证消息顺序(RocketMQ 顺序消息)
|
||||
- 版本号双重保障
|
||||
- 查询统一从 Redis(数据准确)
|
||||
- 性能优秀(QPS 50000+)
|
||||
|
||||
#### 缺点
|
||||
|
||||
- 顺序消息性能略低于并发消息
|
||||
- 需要维护版本号
|
||||
- 实现复杂度中等
|
||||
|
||||
### 方案 B:双写模式(强一致性)
|
||||
|
||||
#### 架构设计
|
||||
|
||||
```
|
||||
┌─────────┐ ┌─────────┐ ┌─────────┐
|
||||
│ 业务请求 │ ───> │ Redis │ ───> │ MySQL │
|
||||
└─────────┘ └─────────┘ └─────────┘
|
||||
原子操作 同步写入
|
||||
|
||||
查询优先 Redis,Redis 不存在则查 MySQL
|
||||
```
|
||||
|
||||
#### 核心实现
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class DualWriteInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
@Autowired
|
||||
private TransactionTemplate transactionTemplate;
|
||||
|
||||
/**
|
||||
* 锁定库存(双写模式)
|
||||
*/
|
||||
public Result<Boolean> lockInventory(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// 1. Redis 扣减
|
||||
Long redisResult = deductInventoryWithLua(key, dto.getQuantity());
|
||||
|
||||
if (redisResult < 0) {
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
try {
|
||||
// 2. 数据库扣减(事务)
|
||||
Boolean dbResult = transactionTemplate.execute(status -> {
|
||||
try {
|
||||
InventoryDO inventory = inventoryRepository
|
||||
.findByInventoryCode(dto.getInventoryCode());
|
||||
|
||||
if (inventory == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 使用乐观锁更新
|
||||
inventory.setRemainingStock(
|
||||
inventory.getRemainingStock() - dto.getQuantity()
|
||||
);
|
||||
inventory.setUpdateTime(LocalDateTime.now());
|
||||
|
||||
return inventoryRepository.updateWithVersion(inventory);
|
||||
|
||||
} catch (Exception e) {
|
||||
status.setRollbackOnly();
|
||||
throw e;
|
||||
}
|
||||
});
|
||||
|
||||
if (Boolean.FALSE.equals(dbResult)) {
|
||||
// 数据库更新失败,回滚 Redis
|
||||
redisTemplate.opsForValue().increment(key, dto.getQuantity());
|
||||
return Result.fail("INVENTORY_LOCK_FAILED", "库存锁定失败");
|
||||
}
|
||||
|
||||
// 3. 记录日志
|
||||
saveInventoryLog(dto);
|
||||
|
||||
return Result.success(true);
|
||||
|
||||
} catch (Exception e) {
|
||||
// 异常时回滚 Redis
|
||||
redisTemplate.opsForValue().increment(key, dto.getQuantity());
|
||||
LogUtil.error("库存锁定失败,已回滚 Redis", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询库存(优先 Redis)
|
||||
*/
|
||||
public Integer getInventoryStock(String inventoryCode) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
|
||||
// 优先从 Redis 查询
|
||||
Integer stock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (stock != null) {
|
||||
return stock;
|
||||
}
|
||||
|
||||
// Redis 中没有,从数据库加载
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(inventoryCode);
|
||||
|
||||
if (inventory == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
stock = inventory.getRemainingStock();
|
||||
|
||||
// 加载到 Redis
|
||||
redisTemplate.opsForValue().set(key, stock, 1, TimeUnit.HOURS);
|
||||
|
||||
return stock;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 优点
|
||||
|
||||
- 强一致性(Redis 和 MySQL 同步)
|
||||
- 无消息乱序问题
|
||||
- 实现相对简单
|
||||
- 查询统一从 Redis
|
||||
|
||||
#### 缺点
|
||||
|
||||
- 性能较低(QPS ~5000)
|
||||
- 响应时间较长(10-20ms)
|
||||
- 数据库压力大
|
||||
- 需要处理回滚
|
||||
|
||||
### 方案 C:最终一致性 + 定时对账
|
||||
|
||||
#### 架构设计
|
||||
|
||||
```
|
||||
┌─────────┐ ┌─────────┐ ┌──────────┐ ┌─────────┐
|
||||
│ 业务请求 │ ───> │ Redis │ ───> │ 异步消息 │ ───> │ MySQL │
|
||||
└─────────┘ └─────────┘ └──────────┘ └─────────┘
|
||||
原子操作 最终一致 异步同步
|
||||
|
||||
┌──────────┐
|
||||
│ 定时对账 │
|
||||
└──────────┘
|
||||
每小时执行
|
||||
```
|
||||
|
||||
#### 核心实现
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class EventualConsistencyInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
/**
|
||||
* 锁定库存(异步模式)
|
||||
*/
|
||||
public Result<Boolean> lockInventory(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// Redis 扣减
|
||||
Long result = deductInventoryWithLua(key, dto.getQuantity());
|
||||
|
||||
if (result < 0) {
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
// 发送异步消息
|
||||
InventoryChangeMessage message = buildMessage(dto);
|
||||
rocketMQTemplate.asyncSend("inventory-change-topic", message, null);
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时对账(每小时)
|
||||
*/
|
||||
@Scheduled(cron = "0 0 * * * ?")
|
||||
public void reconcileInventory() {
|
||||
List<InventoryDO> inventories = inventoryRepository.findAll();
|
||||
|
||||
for (InventoryDO inventory : inventories) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventory.getInventoryCode();
|
||||
Integer redisStock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (redisStock == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 检查是否一致
|
||||
if (!redisStock.equals(inventory.getRemainingStock())) {
|
||||
LogUtil.warn("库存不一致,以 Redis 为准修正: inventoryCode={}, Redis={}, MySQL={}",
|
||||
inventory.getInventoryCode(), redisStock, inventory.getRemainingStock());
|
||||
|
||||
// 以 Redis 为准,修正 MySQL
|
||||
inventory.setRemainingStock(redisStock);
|
||||
inventoryRepository.save(inventory);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询库存(统一从 Redis)
|
||||
*/
|
||||
public Integer getInventoryStock(String inventoryCode) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
return (Integer) redisTemplate.opsForValue().get(key);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 优点
|
||||
|
||||
- 性能最高(QPS 50000+)
|
||||
- 实现简单
|
||||
- 查询统一从 Redis
|
||||
- 定时对账保证最终一致
|
||||
|
||||
#### 缺点
|
||||
|
||||
- 可能出现短暂不一致
|
||||
- 依赖定时任务
|
||||
- 对账间隔内可能有误差
|
||||
|
||||
## 推荐方案
|
||||
|
||||
### 根据业务场景选择
|
||||
|
||||
**高并发秒杀场景**
|
||||
- 推荐:方案 A(顺序消息 + Redis 查询)
|
||||
- 理由:性能高,数据准确,消息有序
|
||||
|
||||
**普通电商场景**
|
||||
- 推荐:方案 C(最终一致性 + 定时对账)
|
||||
- 理由:实现简单,性能优秀,对账保底
|
||||
|
||||
**金融级场景**
|
||||
- 推荐:方案 B(双写模式)
|
||||
- 理由:强一致性,无数据延迟
|
||||
|
||||
## 最佳实践
|
||||
|
||||
### 1. 统一查询入口
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class InventoryQueryService {
|
||||
|
||||
/**
|
||||
* 所有库存查询统一走这个方法
|
||||
*/
|
||||
public Integer getInventoryStock(String inventoryCode) {
|
||||
// 优先 Redis
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
Integer stock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (stock != null) {
|
||||
return stock;
|
||||
}
|
||||
|
||||
// Redis 不存在,从数据库加载并缓存
|
||||
return loadAndCacheFromDB(inventoryCode);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 2. 监控告警
|
||||
|
||||
```java
|
||||
@Component
|
||||
public class InventoryConsistencyMonitor {
|
||||
|
||||
@Scheduled(fixedRate = 60000)
|
||||
public void monitorConsistency() {
|
||||
// 检查 Redis 和 MySQL 的差异
|
||||
List<InventoryDO> inventories = inventoryRepository.findAll();
|
||||
|
||||
for (InventoryDO inventory : inventories) {
|
||||
Integer redisStock = getRedisStock(inventory.getInventoryCode());
|
||||
Integer mysqlStock = inventory.getRemainingStock();
|
||||
|
||||
if (redisStock != null && !redisStock.equals(mysqlStock)) {
|
||||
int diff = Math.abs(redisStock - mysqlStock);
|
||||
|
||||
// 差异超过阈值告警
|
||||
if (diff > 10) {
|
||||
alertService.sendAlert(String.format(
|
||||
"库存差异过大: %s, Redis=%d, MySQL=%d, diff=%d",
|
||||
inventory.getInventoryCode(), redisStock, mysqlStock, diff
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3. 降级策略
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class InventoryServiceWithFallback {
|
||||
|
||||
public Result<Boolean> lockInventory(InventoryOperationDTO dto) {
|
||||
try {
|
||||
// 优先使用 Redis 方案
|
||||
return redisInventoryService.lockInventory(dto);
|
||||
|
||||
} catch (RedisConnectionException e) {
|
||||
LogUtil.error("Redis 故障,降级到数据库", e);
|
||||
|
||||
// 降级到数据库乐观锁
|
||||
return databaseInventoryService.lockInventory(dto);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 总结
|
||||
|
||||
针对你提出的两个问题:
|
||||
|
||||
**问题 1:消息乱序**
|
||||
- 解决方案:使用 RocketMQ 顺序消息 + 版本号校验
|
||||
- 关键点:sharding key 保证同一库存的消息顺序
|
||||
|
||||
**问题 2:查询数据源**
|
||||
- 解决方案:统一从 Redis 查询,Redis 是实时数据源
|
||||
- 关键点:MySQL 仅作为持久化存储,不用于实时查询
|
||||
|
||||
推荐使用方案 A(顺序消息 + Redis 查询),在性能和一致性之间取得最佳平衡。
|
||||
Reference in New Issue
Block a user