高并发库存方案
This commit is contained in:
@@ -401,91 +401,435 @@ public class InventoryChangeMessage implements Serializable {
|
||||
|
||||
## 数据一致性保证
|
||||
|
||||
### 问题分析
|
||||
### 核心问题
|
||||
|
||||
**问题 1:消息乱序导致库存混乱**
|
||||
**问题 1:Redis 与 MySQL 的数据一致性**
|
||||
|
||||
消息队列无法保证严格的时序性,可能出现:
|
||||
Redis 作为实时库存,MySQL 作为持久化存储,两者如何保持一致?
|
||||
|
||||
**问题 2:消息乱序问题**
|
||||
|
||||
即使使用 RocketMQ 顺序消息,也无法完全避免乱序:
|
||||
```
|
||||
时间线:
|
||||
T1: 锁定库存 -10 (发送消息 M1)
|
||||
T2: 释放库存 +10 (发送消息 M2)
|
||||
T1: 锁定库存 -10 (version=1, 发送消息 M1)
|
||||
T2: 释放库存 +10 (version=2, 发送消息 M2)
|
||||
|
||||
消费顺序可能是:
|
||||
M2 先到达 → +10 (错误:此时还未扣减)
|
||||
M1 后到达 → -10
|
||||
可能的情况:
|
||||
1. M2 因网络原因先发送成功
|
||||
2. M1 后发送成功
|
||||
3. 即使在同一队列,消费顺序也是 M2 → M1
|
||||
4. 结果:先 +10,再 -10,数据错误!
|
||||
```
|
||||
|
||||
**问题 2:库存查询数据源不一致**
|
||||
**问题 3:查询数据源选择**
|
||||
|
||||
- Redis 是实时数据(最准确)
|
||||
- MySQL 是异步同步数据(有延迟)
|
||||
- 查询时应该以哪个为准?
|
||||
|
||||
### 解决方案 1:消息时序保证
|
||||
### 推荐方案:Redis 主 + MySQL 定时备份
|
||||
|
||||
#### 方案 A:使用顺序消息(推荐)
|
||||
**核心思路**
|
||||
- Redis 作为唯一实时数据源(所有读写操作)
|
||||
- MySQL 仅作为备份和对账(定时全量同步)
|
||||
- 操作日志单独记录(用于审计和问题排查)
|
||||
|
||||
**优势**
|
||||
- 性能最高:不依赖 MQ 和 MySQL 实时同步
|
||||
- 逻辑最简单:没有消息乱序问题
|
||||
- 可靠性足够:Redis 持久化 + 定时备份
|
||||
|
||||
#### 实现代码
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class OrderedInventoryService {
|
||||
public class SimplifiedInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private InventoryLogRepository inventoryLogRepository;
|
||||
|
||||
private static final String INVENTORY_KEY_PREFIX = "inventory:";
|
||||
private String deductScriptSha;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
String script = loadLuaScript("inventory_deduct.lua");
|
||||
deductScriptSha = redisTemplate.execute(
|
||||
(RedisCallback<String>) connection ->
|
||||
connection.scriptLoad(script.getBytes())
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送顺序消息(按 inventoryCode 分区)
|
||||
* 锁定库存(只操作 Redis)
|
||||
*/
|
||||
public Result<Boolean> lockInventory(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// 执行 Lua 脚本扣减库存
|
||||
Long result = redisTemplate.execute(...);
|
||||
// 执行 Lua 脚本原子扣减
|
||||
Long result = redisTemplate.execute(
|
||||
(RedisCallback<Long>) connection ->
|
||||
connection.evalSha(
|
||||
deductScriptSha,
|
||||
ReturnType.INTEGER,
|
||||
1,
|
||||
key.getBytes(),
|
||||
String.valueOf(dto.getQuantity()).getBytes(),
|
||||
"0".getBytes()
|
||||
)
|
||||
);
|
||||
|
||||
if (result < 0) {
|
||||
if (result == null || result == -1) {
|
||||
return Result.fail("INVENTORY_NOT_FOUND", "库存不存在");
|
||||
}
|
||||
|
||||
if (result == -2) {
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
// 发送顺序消息(使用 inventoryCode 作为 sharding key)
|
||||
InventoryChangeMessage message = buildMessage(dto);
|
||||
|
||||
rocketMQTemplate.syncSendOrderly(
|
||||
"inventory-change-topic",
|
||||
message,
|
||||
dto.getInventoryCode() // sharding key,保证同一库存的消息顺序
|
||||
);
|
||||
// 异步记录操作日志(不影响主流程性能)
|
||||
CompletableFuture.runAsync(() -> {
|
||||
saveInventoryLog(dto, "LOCK", result);
|
||||
});
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放库存
|
||||
*/
|
||||
public Result<Boolean> releaseInventory(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// 增加库存
|
||||
Long newStock = redisTemplate.opsForValue()
|
||||
.increment(key, dto.getQuantity());
|
||||
|
||||
// 异步记录日志
|
||||
CompletableFuture.runAsync(() -> {
|
||||
saveInventoryLog(dto, "RELEASE", newStock);
|
||||
});
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询库存(统一从 Redis 查询)
|
||||
*/
|
||||
public Integer getInventoryStock(String inventoryCode) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
|
||||
Integer stock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (stock == null) {
|
||||
// Redis 中没有,从数据库加载
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(inventoryCode);
|
||||
if (inventory != null) {
|
||||
stock = inventory.getRemainingStock();
|
||||
// 加载到 Redis
|
||||
redisTemplate.opsForValue().set(key, stock, 1, TimeUnit.HOURS);
|
||||
}
|
||||
}
|
||||
|
||||
return stock;
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录操作日志
|
||||
*/
|
||||
private void saveInventoryLog(InventoryOperationDTO dto, String operationType, Long afterStock) {
|
||||
try {
|
||||
InventoryLogDO log = new InventoryLogDO();
|
||||
log.setInventoryCode(dto.getInventoryCode());
|
||||
log.setOperationType(operationType);
|
||||
log.setQuantity(dto.getQuantity());
|
||||
log.setAfterQty(afterStock.intValue());
|
||||
log.setOrderNo(dto.getOrderNo());
|
||||
log.setCreateTime(LocalDateTime.now());
|
||||
|
||||
inventoryLogRepository.save(log);
|
||||
} catch (Exception e) {
|
||||
LogUtil.error("记录库存日志失败: {}", dto, e);
|
||||
// 日志失败不影响主流程
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 方案 B:消息携带版本号
|
||||
#### 定时同步到 MySQL(备份)
|
||||
|
||||
```java
|
||||
@Data
|
||||
public class InventoryChangeMessage implements Serializable {
|
||||
@Service
|
||||
public class InventorySyncService {
|
||||
|
||||
private String inventoryCode;
|
||||
private String operationType;
|
||||
private Integer quantity;
|
||||
private String orderNo;
|
||||
private Long timestamp;
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
// 新增:版本号,用于检测乱序
|
||||
private Long version;
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
// 新增:操作序列号
|
||||
private Long sequenceNo;
|
||||
private static final String INVENTORY_KEY_PREFIX = "inventory:";
|
||||
|
||||
/**
|
||||
* 定时全量同步到 MySQL(每 10 分钟)
|
||||
*/
|
||||
@Scheduled(cron = "0 */10 * * * ?")
|
||||
public void syncToMySQL() {
|
||||
LogUtil.info("开始同步库存到 MySQL...");
|
||||
|
||||
try {
|
||||
// 获取所有库存 key
|
||||
Set<String> keys = redisTemplate.keys(INVENTORY_KEY_PREFIX + "*");
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
LogUtil.info("没有需要同步的库存数据");
|
||||
return;
|
||||
}
|
||||
|
||||
int successCount = 0;
|
||||
int failCount = 0;
|
||||
|
||||
// 批量同步
|
||||
for (String key : keys) {
|
||||
try {
|
||||
String inventoryCode = key.replace(INVENTORY_KEY_PREFIX, "");
|
||||
Integer redisStock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (redisStock == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 更新 MySQL
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(inventoryCode);
|
||||
if (inventory != null) {
|
||||
inventory.setRemainingStock(redisStock);
|
||||
inventory.setUpdateTime(LocalDateTime.now());
|
||||
inventoryRepository.save(inventory);
|
||||
successCount++;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
failCount++;
|
||||
LogUtil.error("同步库存失败: key={}", key, e);
|
||||
}
|
||||
}
|
||||
|
||||
LogUtil.info("库存同步完成,成功: {}, 失败: {}", successCount, failCount);
|
||||
|
||||
} catch (Exception e) {
|
||||
LogUtil.error("库存同步异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量同步(性能优化版)
|
||||
*/
|
||||
@Scheduled(cron = "0 */10 * * * ?")
|
||||
public void batchSyncToMySQL() {
|
||||
LogUtil.info("开始批量同步库存到 MySQL...");
|
||||
|
||||
try {
|
||||
Set<String> keys = redisTemplate.keys(INVENTORY_KEY_PREFIX + "*");
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 批量获取 Redis 数据
|
||||
List<Object> values = redisTemplate.opsForValue().multiGet(keys);
|
||||
|
||||
// 构建批量更新数据
|
||||
List<InventoryDO> updateList = new ArrayList<>();
|
||||
|
||||
int index = 0;
|
||||
for (String key : keys) {
|
||||
String inventoryCode = key.replace(INVENTORY_KEY_PREFIX, "");
|
||||
Integer stock = (Integer) values.get(index++);
|
||||
|
||||
if (stock != null) {
|
||||
InventoryDO inventory = new InventoryDO();
|
||||
inventory.setInventoryCode(inventoryCode);
|
||||
inventory.setRemainingStock(stock);
|
||||
inventory.setUpdateTime(LocalDateTime.now());
|
||||
updateList.add(inventory);
|
||||
}
|
||||
}
|
||||
|
||||
// 批量更新数据库
|
||||
if (!updateList.isEmpty()) {
|
||||
inventoryRepository.batchUpdate(updateList);
|
||||
LogUtil.info("批量同步完成,共 {} 条", updateList.size());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LogUtil.error("批量同步异常", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 数据对账机制
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class InventoryReconciliationService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
@Autowired
|
||||
private AlertService alertService;
|
||||
|
||||
private static final String INVENTORY_KEY_PREFIX = "inventory:";
|
||||
|
||||
/**
|
||||
* 定时对账(每小时执行)
|
||||
*/
|
||||
@Scheduled(cron = "0 0 * * * ?")
|
||||
public void reconcileInventory() {
|
||||
LogUtil.info("开始库存对账...");
|
||||
|
||||
List<InventoryDO> inventories = inventoryRepository.findAll();
|
||||
int inconsistentCount = 0;
|
||||
List<String> inconsistentList = new ArrayList<>();
|
||||
|
||||
for (InventoryDO inventory : inventories) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventory.getInventoryCode();
|
||||
Integer redisStock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (redisStock == null) {
|
||||
// Redis 中没有数据,可能是过期了,从 MySQL 加载
|
||||
redisTemplate.opsForValue().set(key, inventory.getRemainingStock(),
|
||||
1, TimeUnit.HOURS);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 检查是否一致
|
||||
if (!redisStock.equals(inventory.getRemainingStock())) {
|
||||
inconsistentCount++;
|
||||
|
||||
String detail = String.format(
|
||||
"库存不一致: code=%s, Redis=%d, MySQL=%d, diff=%d",
|
||||
inventory.getInventoryCode(),
|
||||
redisStock,
|
||||
inventory.getRemainingStock(),
|
||||
redisStock - inventory.getRemainingStock()
|
||||
);
|
||||
|
||||
LogUtil.warn(detail);
|
||||
inconsistentList.add(detail);
|
||||
|
||||
// 以 Redis 为准,修正 MySQL(因为 Redis 是实时数据)
|
||||
inventory.setRemainingStock(redisStock);
|
||||
inventory.setUpdateTime(LocalDateTime.now());
|
||||
inventoryRepository.save(inventory);
|
||||
}
|
||||
}
|
||||
|
||||
if (inconsistentCount > 0) {
|
||||
// 发送告警
|
||||
alertService.sendAlert(
|
||||
"库存对账发现不一致",
|
||||
String.format("共发现 %d 条不一致数据,已自动修正\n%s",
|
||||
inconsistentCount,
|
||||
String.join("\n", inconsistentList))
|
||||
);
|
||||
}
|
||||
|
||||
LogUtil.info("库存对账完成,发现 {} 条不一致数据", inconsistentCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* 手动对账(用于问题排查)
|
||||
*/
|
||||
public ReconciliationResult manualReconcile(String inventoryCode) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
|
||||
Integer redisStock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(inventoryCode);
|
||||
|
||||
if (inventory == null) {
|
||||
return ReconciliationResult.notFound();
|
||||
}
|
||||
|
||||
ReconciliationResult result = new ReconciliationResult();
|
||||
result.setInventoryCode(inventoryCode);
|
||||
result.setRedisStock(redisStock);
|
||||
result.setMysqlStock(inventory.getRemainingStock());
|
||||
result.setConsistent(Objects.equals(redisStock, inventory.getRemainingStock()));
|
||||
|
||||
if (!result.isConsistent()) {
|
||||
result.setDifference(redisStock - inventory.getRemainingStock());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
class ReconciliationResult {
|
||||
private String inventoryCode;
|
||||
private Integer redisStock;
|
||||
private Integer mysqlStock;
|
||||
private boolean consistent;
|
||||
private Integer difference;
|
||||
|
||||
public static ReconciliationResult notFound() {
|
||||
ReconciliationResult result = new ReconciliationResult();
|
||||
result.setConsistent(false);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 备选方案:MQ 异步同步(复杂场景)
|
||||
|
||||
如果业务要求 MySQL 必须实时同步(如需要关联查询、复杂报表等),可以使用 MQ 方案,但需要解决消息乱序问题。
|
||||
|
||||
#### 核心问题
|
||||
|
||||
RocketMQ 顺序消息只能保证同一队列内的消费顺序,但无法保证发送顺序 = 业务发生顺序。
|
||||
|
||||
**示例:**
|
||||
```
|
||||
T1: 锁定库存 -10 (version=1) → 发送 M1
|
||||
T2: 释放库存 +10 (version=2) → 发送 M2
|
||||
|
||||
可能情况:
|
||||
- M2 因网络原因先发送成功
|
||||
- M1 后发送成功
|
||||
- 消费顺序:M2 → M1(错误!)
|
||||
```
|
||||
|
||||
#### 解决方案:消费者端版本号排序
|
||||
|
||||
**核心思路:**
|
||||
1. 每次操作时 Redis 生成递增版本号
|
||||
2. 消息携带版本号
|
||||
3. 消费者维护优先级队列,按版本号排序
|
||||
4. 只处理版本号连续的消息,不连续则等待
|
||||
|
||||
**实现代码:**
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class VersionedInventoryService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
private final AtomicLong sequenceGenerator = new AtomicLong(0);
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
private static final String INVENTORY_KEY_PREFIX = "inventory:";
|
||||
private String deductScriptSha;
|
||||
|
||||
/**
|
||||
* 锁定库存(带版本号)
|
||||
@@ -494,61 +838,84 @@ public class VersionedInventoryService {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
String versionKey = key + ":version";
|
||||
|
||||
// 获取当前版本号
|
||||
Long currentVersion = redisTemplate.opsForValue().increment(versionKey);
|
||||
// 1. 获取版本号(保证顺序)
|
||||
Long version = redisTemplate.opsForValue().increment(versionKey);
|
||||
|
||||
// 执行库存扣减
|
||||
Long result = deductInventoryWithLua(key, dto.getQuantity());
|
||||
// 2. Redis 原子扣减
|
||||
Long result = redisTemplate.execute(
|
||||
(RedisCallback<Long>) connection ->
|
||||
connection.evalSha(
|
||||
deductScriptSha,
|
||||
ReturnType.INTEGER,
|
||||
1,
|
||||
key.getBytes(),
|
||||
String.valueOf(dto.getQuantity()).getBytes(),
|
||||
"0".getBytes()
|
||||
)
|
||||
);
|
||||
|
||||
if (result < 0) {
|
||||
if (result == null || result < 0) {
|
||||
// 回滚版本号
|
||||
redisTemplate.opsForValue().decrement(versionKey);
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
// 发送消息(携带版本号)
|
||||
// 3. 发送消息(携带版本号)
|
||||
InventoryChangeMessage message = new InventoryChangeMessage();
|
||||
message.setInventoryCode(dto.getInventoryCode());
|
||||
message.setOperationType("LOCK");
|
||||
message.setQuantity(dto.getQuantity());
|
||||
message.setOrderNo(dto.getOrderNo());
|
||||
message.setVersion(currentVersion);
|
||||
message.setSequenceNo(sequenceGenerator.incrementAndGet());
|
||||
message.setVersion(version); // 关键:版本号
|
||||
message.setTimestamp(System.currentTimeMillis());
|
||||
|
||||
rocketMQTemplate.asyncSend("inventory-change-topic", message, null);
|
||||
// 使用 inventoryCode 作为 sharding key
|
||||
rocketMQTemplate.syncSendOrderly(
|
||||
"inventory-change-topic",
|
||||
message,
|
||||
dto.getInventoryCode()
|
||||
);
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 方案 C:消费者端排序(最可靠)
|
||||
@Data
|
||||
class InventoryChangeMessage implements Serializable {
|
||||
private String inventoryCode;
|
||||
private String operationType;
|
||||
private Integer quantity;
|
||||
private String orderNo;
|
||||
private Long version; // 版本号
|
||||
private Long timestamp;
|
||||
}
|
||||
|
||||
```java
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "inventory-change-topic",
|
||||
consumerGroup = "inventory-sync-consumer",
|
||||
consumeMode = ConsumeMode.ORDERLY // 顺序消费
|
||||
consumeMode = ConsumeMode.ORDERLY
|
||||
)
|
||||
public class OrderedInventorySyncConsumer implements RocketMQListener<InventoryChangeMessage> {
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
// 每个库存维护一个消息队列
|
||||
@Autowired
|
||||
private AlertService alertService;
|
||||
|
||||
// 每个库存维护一个优先级队列
|
||||
private final Map<String, PriorityBlockingQueue<InventoryChangeMessage>> messageQueues
|
||||
= new ConcurrentHashMap<>();
|
||||
|
||||
// 每个库存维护已处理的最大版本号
|
||||
// 每个库存已处理的最大版本号
|
||||
private final Map<String, Long> processedVersions = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void onMessage(InventoryChangeMessage message) {
|
||||
String inventoryCode = message.getInventoryCode();
|
||||
|
||||
// 获取该库存的消息队列
|
||||
// 获取该库存的消息队列(按版本号排序)
|
||||
PriorityBlockingQueue<InventoryChangeMessage> queue = messageQueues
|
||||
.computeIfAbsent(inventoryCode, k -> new PriorityBlockingQueue<>(
|
||||
100,
|
||||
@@ -571,10 +938,24 @@ public class OrderedInventorySyncConsumer implements RocketMQListener<InventoryC
|
||||
|
||||
// 检查版本号是否连续
|
||||
if (message.getVersion() != lastProcessedVersion + 1) {
|
||||
// 版本号不连续,等待前面的消息
|
||||
LogUtil.warn("消息版本号不连续,等待: inventoryCode={}, expected={}, actual={}",
|
||||
inventoryCode, lastProcessedVersion + 1, message.getVersion());
|
||||
break;
|
||||
// 版本号不连续,检查是否超时
|
||||
long waitTime = System.currentTimeMillis() - message.getTimestamp();
|
||||
if (waitTime > 30000) { // 等待超过 30 秒
|
||||
LogUtil.error("消息等待超时,可能前序消息丢失: inventoryCode={}, expected={}, actual={}",
|
||||
inventoryCode, lastProcessedVersion + 1, message.getVersion());
|
||||
|
||||
// 告警
|
||||
alertService.sendAlert("库存消息乱序: " + inventoryCode);
|
||||
|
||||
// 跳过缺失的版本(风险操作,需人工介入)
|
||||
lastProcessedVersion = message.getVersion() - 1;
|
||||
processedVersions.put(inventoryCode, lastProcessedVersion);
|
||||
} else {
|
||||
// 继续等待
|
||||
LogUtil.warn("消息版本号不连续,等待: inventoryCode={}, expected={}, actual={}",
|
||||
inventoryCode, lastProcessedVersion + 1, message.getVersion());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 版本号连续,处理消息
|
||||
@@ -597,7 +978,6 @@ public class OrderedInventorySyncConsumer implements RocketMQListener<InventoryC
|
||||
}
|
||||
|
||||
private void processSingleMessage(InventoryChangeMessage message) {
|
||||
// 查询库存
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(
|
||||
message.getInventoryCode()
|
||||
);
|
||||
@@ -615,12 +995,6 @@ public class OrderedInventorySyncConsumer implements RocketMQListener<InventoryC
|
||||
);
|
||||
break;
|
||||
|
||||
case "DEDUCT":
|
||||
inventory.setSoldStock(
|
||||
inventory.getSoldStock() + message.getQuantity()
|
||||
);
|
||||
break;
|
||||
|
||||
case "RELEASE":
|
||||
inventory.setRemainingStock(
|
||||
inventory.getRemainingStock() + message.getQuantity()
|
||||
@@ -637,13 +1011,93 @@ public class OrderedInventorySyncConsumer implements RocketMQListener<InventoryC
|
||||
}
|
||||
```
|
||||
|
||||
### 解决方案 2:库存查询数据源
|
||||
**方案缺陷:**
|
||||
1. 消息丢失会导致永久等待(需要超时机制)
|
||||
2. 内存占用较高(每个库存维护队列)
|
||||
3. 实现复杂度高
|
||||
|
||||
#### 统一查询接口(以 Redis 为准)
|
||||
**适用场景:**
|
||||
- 必须要求 MySQL 实时同步
|
||||
- 需要复杂的关联查询
|
||||
- 对数据一致性要求极高
|
||||
|
||||
## 方案对比
|
||||
|
||||
### 推荐方案 vs MQ 方案
|
||||
|
||||
| 对比项 | 推荐方案(Redis主+定时备份) | MQ方案(实时同步) |
|
||||
|--------|---------------------------|-------------------|
|
||||
| 性能 | 极高(QPS 50000+) | 较高(QPS ~30000) |
|
||||
| 响应时间 | 2-5ms | 5-10ms |
|
||||
| 实现复杂度 | 简单 | 复杂 |
|
||||
| 数据一致性 | 最终一致(10分钟) | 准实时(秒级) |
|
||||
| 消息乱序风险 | 无 | 有(需版本号排序) |
|
||||
| 内存占用 | 低 | 较高(消息队列) |
|
||||
| 运维成本 | 低 | 高 |
|
||||
| 适用场景 | 大部分场景 | 强一致性要求场景 |
|
||||
|
||||
### 选择建议
|
||||
|
||||
**使用推荐方案(Redis主+定时备份)的场景:**
|
||||
- 高并发秒杀、抢购
|
||||
- 对性能要求极高
|
||||
- 可接受最终一致性(10分钟内)
|
||||
- 查询主要基于库存编码
|
||||
|
||||
**使用MQ方案(实时同步)的场景:**
|
||||
- 需要复杂的关联查询(如订单+库存+商品)
|
||||
- 需要实时报表统计
|
||||
- 对数据一致性要求极高
|
||||
- 有专业的运维团队
|
||||
|
||||
## 性能优化
|
||||
|
||||
### Redis 连接池配置
|
||||
|
||||
```yaml
|
||||
spring:
|
||||
redis:
|
||||
host: localhost
|
||||
port: 6379
|
||||
database: 0
|
||||
lettuce:
|
||||
pool:
|
||||
max-active: 200 # 最大连接数
|
||||
max-idle: 50 # 最大空闲连接
|
||||
min-idle: 10 # 最小空闲连接
|
||||
max-wait: 3000ms # 最大等待时间
|
||||
timeout: 3000ms
|
||||
```
|
||||
|
||||
### Pipeline 批量操作
|
||||
|
||||
```java
|
||||
public void batchLoadInventory(List<String> inventoryCodes) {
|
||||
redisTemplate.executePipelined(new RedisCallback<Object>() {
|
||||
@Override
|
||||
public Object doInRedis(RedisConnection connection) {
|
||||
for (String code : inventoryCodes) {
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(code);
|
||||
if (inventory != null) {
|
||||
String key = INVENTORY_KEY_PREFIX + code;
|
||||
connection.set(
|
||||
key.getBytes(),
|
||||
String.valueOf(inventory.getRemainingStock()).getBytes()
|
||||
);
|
||||
connection.expire(key.getBytes(), 3600);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
### 库存预热
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class UnifiedInventoryQueryService {
|
||||
public class InventoryWarmUpService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
@@ -652,141 +1106,129 @@ public class UnifiedInventoryQueryService {
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
/**
|
||||
* 查询库存(统一从 Redis 查询)
|
||||
* 系统启动时预热热门库存
|
||||
*/
|
||||
public InventoryVO getInventory(String inventoryCode) {
|
||||
String key = INVENTORY_KEY_PREFIX + inventoryCode;
|
||||
@PostConstruct
|
||||
public void warmUpHotInventory() {
|
||||
LogUtil.info("开始预热热门库存...");
|
||||
|
||||
// 优先从 Redis 查询
|
||||
Integer redisStock = (Integer) redisTemplate.opsForValue().get(key);
|
||||
// 查询最近 7 天有交易的库存
|
||||
List<String> hotInventoryCodes = inventoryRepository
|
||||
.findHotInventoryCodes(7);
|
||||
|
||||
if (redisStock != null) {
|
||||
// Redis 中有数据,直接返回
|
||||
InventoryVO vo = new InventoryVO();
|
||||
vo.setInventoryCode(inventoryCode);
|
||||
vo.setRemainingStock(redisStock);
|
||||
vo.setDataSource("REDIS");
|
||||
return vo;
|
||||
}
|
||||
|
||||
// Redis 中没有,从数据库加载
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(inventoryCode);
|
||||
|
||||
if (inventory == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 加载到 Redis
|
||||
redisTemplate.opsForValue().set(key, inventory.getRemainingStock(), 1, TimeUnit.HOURS);
|
||||
|
||||
InventoryVO vo = new InventoryVO();
|
||||
vo.setInventoryCode(inventoryCode);
|
||||
vo.setRemainingStock(inventory.getRemainingStock());
|
||||
vo.setDataSource("MYSQL");
|
||||
|
||||
return vo;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量查询库存
|
||||
*/
|
||||
public Map<String, Integer> batchGetInventory(List<String> inventoryCodes) {
|
||||
Map<String, Integer> result = new HashMap<>();
|
||||
|
||||
// 批量从 Redis 查询
|
||||
List<String> keys = inventoryCodes.stream()
|
||||
.map(code -> INVENTORY_KEY_PREFIX + code)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<Object> values = redisTemplate.opsForValue().multiGet(keys);
|
||||
|
||||
// 收集 Redis 中不存在的库存编码
|
||||
List<String> missingCodes = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < inventoryCodes.size(); i++) {
|
||||
String code = inventoryCodes.get(i);
|
||||
Object value = values.get(i);
|
||||
|
||||
if (value != null) {
|
||||
result.put(code, (Integer) value);
|
||||
} else {
|
||||
missingCodes.add(code);
|
||||
}
|
||||
}
|
||||
|
||||
// 从数据库加载缺失的库存
|
||||
if (!missingCodes.isEmpty()) {
|
||||
List<InventoryDO> inventories = inventoryRepository
|
||||
.findByInventoryCodeIn(missingCodes);
|
||||
|
||||
for (InventoryDO inventory : inventories) {
|
||||
result.put(inventory.getInventoryCode(), inventory.getRemainingStock());
|
||||
|
||||
// 加载到 Redis
|
||||
String key = INVENTORY_KEY_PREFIX + inventory.getInventoryCode();
|
||||
for (String code : hotInventoryCodes) {
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(code);
|
||||
if (inventory != null) {
|
||||
String key = INVENTORY_KEY_PREFIX + code;
|
||||
redisTemplate.opsForValue().set(
|
||||
key,
|
||||
inventory.getRemainingStock(),
|
||||
inventory.getRemainingStock(),
|
||||
1,
|
||||
TimeUnit.HOURS
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
LogUtil.info("热门库存预热完成,共 {} 条", hotInventoryCodes.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时预热(每小时执行)
|
||||
*/
|
||||
@Scheduled(cron = "0 0 * * * ?")
|
||||
public void scheduledWarmUp() {
|
||||
warmUpHotInventory();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 双写一致性保证
|
||||
## 性能指标
|
||||
|
||||
| 指标 | 推荐方案 | MQ方案 |
|
||||
|------|---------|--------|
|
||||
| QPS | ~50000 | ~30000 |
|
||||
| 响应时间 | 2-5ms | 5-10ms |
|
||||
| 成功率 | 99.9% | 99.5% |
|
||||
| Redis 内存 | ~1KB/库存 | ~1KB/库存 |
|
||||
| MySQL 延迟 | 10分钟 | 秒级 |
|
||||
|
||||
## 故障处理
|
||||
|
||||
### Redis 故障降级
|
||||
|
||||
```java
|
||||
@Service
|
||||
public class DualWriteInventoryService {
|
||||
public class InventoryFallbackService {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
private RedisInventoryService redisInventoryService;
|
||||
|
||||
@Autowired
|
||||
private InventoryRepository inventoryRepository;
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
public Result<Boolean> lockInventoryWithFallback(InventoryOperationDTO dto) {
|
||||
try {
|
||||
// 优先使用 Redis
|
||||
return redisInventoryService.lockInventory(dto);
|
||||
|
||||
} catch (RedisConnectionException e) {
|
||||
LogUtil.error("Redis 连接失败,降级到数据库", e);
|
||||
|
||||
// 降级到数据库乐观锁
|
||||
return lockInventoryFromDB(dto);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 锁定库存(双写模式)
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Result<Boolean> lockInventoryWithDualWrite(InventoryOperationDTO dto) {
|
||||
String key = INVENTORY_KEY_PREFIX + dto.getInventoryCode();
|
||||
|
||||
// 1. Redis 扣减(原子操作)
|
||||
Long redisResult = redisTemplate.execute(
|
||||
(RedisCallback<Long>) connection ->
|
||||
connection.evalSha(
|
||||
deductScriptSha,
|
||||
ReturnType.INTEGER,
|
||||
1,
|
||||
key.getBytes(),
|
||||
String.valueOf(dto.getQuantity()).getBytes(),
|
||||
"0".getBytes()
|
||||
)
|
||||
private Result<Boolean> lockInventoryFromDB(InventoryOperationDTO dto) {
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(
|
||||
dto.getInventoryCode()
|
||||
);
|
||||
|
||||
if (redisResult == null || redisResult < 0) {
|
||||
if (inventory == null) {
|
||||
return Result.fail("INVENTORY_NOT_FOUND", "库存不存在");
|
||||
}
|
||||
|
||||
if (inventory.getRemainingStock() < dto.getQuantity()) {
|
||||
return Result.fail("INSUFFICIENT_INVENTORY", "库存不足");
|
||||
}
|
||||
|
||||
try {
|
||||
// 2. 数据库扣减(同步,保证强一致性)
|
||||
InventoryDO inventory = inventoryRepository.findByInventoryCode(
|
||||
dto.getInventoryCode()
|
||||
);
|
||||
|
||||
if (inventory == null) {
|
||||
// 回滚 Redis
|
||||
redisTemplate.opsForValue().increment(key, dto.getQuantity());
|
||||
return Result.fail("INVENTORY_NOT_FOUND", "库存不存在");
|
||||
inventory.setRemainingStock(
|
||||
inventory.getRemainingStock() - dto.getQuantity()
|
||||
);
|
||||
|
||||
boolean success = inventoryRepository.updateWithVersion(inventory);
|
||||
|
||||
if (!success) {
|
||||
return Result.fail("INVENTORY_LOCK_FAILED", "库存锁定失败");
|
||||
}
|
||||
|
||||
return Result.success(true);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 总结
|
||||
|
||||
Redis 原子操作方案是高并发场景的最佳选择,性能提升 50 倍以上。
|
||||
|
||||
**推荐方案(Redis主+定时备份):**
|
||||
- 使用 Lua 脚本保证原子性
|
||||
- Redis 作为唯一实时数据源
|
||||
- MySQL 定时备份(每 10 分钟)
|
||||
- 定时对账机制保证最终一致性
|
||||
- 适用于 90% 的高并发场景
|
||||
|
||||
**MQ方案(实时同步):**
|
||||
- 需要消费者端版本号排序
|
||||
- 实现复杂,运维成本高
|
||||
- 仅在必须实时同步 MySQL 时使用
|
||||
|
||||
**关键要点:**
|
||||
1. Lua 脚本保证 check-then-act 原子性
|
||||
2. 所有查询统一走 Redis
|
||||
3. MySQL 仅作备份和对账
|
||||
4. 定时对账以 Redis 为准修正 MySQL
|
||||
5. Redis 故障时降级到数据库乐观锁
|
||||
}
|
||||
|
||||
inventory.setRemainingStock(
|
||||
|
||||
Reference in New Issue
Block a user