出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
version: "3.1"
services:
seata-server:
image: seataio/seata-server:latest
hostname: seata-server
ports:
- "7091:7091"
- "8091:8091"
environment:
- SEATA_PORT=8091
- STORE_MODE=file
http://localhost:7091/#/Overview
default username and password is admin/admin
订单服务调用库存服务和账户余额服务进行相应的扣减,并且最终生成订单
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seata</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-order</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-http</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
package com.et.seata.order.controller;
import com.et.seata.order.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@RestController
public class HelloWorldController {
@Autowired
private OrderService orderService;
@PostMapping("/create")
public Map<String, Object> createOrder(@RequestParam("userId") Long userId,
@RequestParam("productId") Long productId,
@RequestParam("price") Integer price) throws IOException {
Map<String, Object> map = new HashMap<>();
map.put("msg", "HelloWorld");
map.put("reuslt", orderService.createOrder(userId,productId,price));
return map;
}
}
package com.et.seata.order.service;
import com.alibaba.fastjson.JSONObject;
import com.et.seata.order.dao.OrderDao;
import com.et.seata.order.dto.OrderDO;
import io.seata.core.context.RootContext;
import io.seata.integration.http.DefaultHttpExecutor;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @author liuhaihua
* @version 1.0
* @ClassName OrderServiceImpl
* @Description todo
* @date 2024/08/08/ 13:53
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService{
@Autowired
OrderDao orderDao;
@Override
@GlobalTransactional // <1>
public Integer createOrder(Long userId, Long productId, Integer price) throws IOException {
Integer amount = 1; // 购买数量,暂时设置为 1。
log.info("[createOrder] 当前 XID: {}", RootContext.getXID());
// <2> 扣减库存
this.reduceStock(productId, amount);
// <3> 扣减余额
this.reduceBalance(userId, price);
// <4> 保存订单
log.info("[createOrder] 保存订单");
return this.saveOrder(userId,productId,price,amount);
}
private Integer saveOrder(Long userId, Long productId, Integer price,Integer amount){
// <4> 保存订单
OrderDO order = new OrderDO();
order.setUserId(userId);
order.setProductId(productId);
order.setPayAmount(amount * price);
orderDao.saveOrder(order);
log.info("[createOrder] 保存订单: {}", order.getId());
return order.getId();
}
private void reduceStock(Long productId, Integer amount) throws IOException {
// 参数拼接
JSONObject params = new JSONObject().fluentPut("productId", String.valueOf(productId))
.fluentPut("amount", String.valueOf(amount));
// 执行调用
HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8082", "/stock",
params, HttpResponse.class);
// 解析结果
Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity()));
if (!success) {
throw new RuntimeException("扣除库存失败");
}
}
private void reduceBalance(Long userId, Integer price) throws IOException {
// 参数拼接
JSONObject params = new JSONObject().fluentPut("userId", String.valueOf(userId))
.fluentPut("price", String.valueOf(price));
// 执行调用
HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8083", "/balance",
params, HttpResponse.class);
// 解析结果
Boolean success = Boolean.valueOf(EntityUtils.toString(response.getEntity()));
if (!success) {
throw new RuntimeException("扣除余额失败");
}
}
}
server:
port: 8081 # 端口
spring:
application:
name: order-service
datasource:
url: jdbc:mysql://127.0.0.1:3306/seata_order?useSSL=false&useUnicode=true&characterEncoding=UTF-8
driver-class-name: com.mysql.jdbc.Driver
username: root
password: 123456
# Seata 配置项,对应 SeataProperties 类
seata:
application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
tx-service-group: ${spring.application.name}-group # Seata 事务组编号,用于 TC 集群名
# 服务配置项,对应 ServiceProperties 类
service:
# 虚拟组和分组的映射
vgroup-mapping:
order-service-group: default
# 分组和 Seata 服务的映射
grouplist:
default: 127.0.0.1:8091
package com.et.seata.product.controller;
import com.et.seata.product.dto.ProductReduceStockDTO;
import com.et.seata.product.service.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ProductController {
@Autowired
ProductService productService;
@PostMapping("/stock")
public Boolean reduceStock(@RequestBody ProductReduceStockDTO productReduceStockDTO) {
log.info("[reduceStock] 收到减少库存请求, 商品:{}, 价格:{}", productReduceStockDTO.getProductId(),
productReduceStockDTO.getAmount());
try {
productService.reduceStock(productReduceStockDTO.getProductId(), productReduceStockDTO.getAmount());
// 正常扣除库存,返回 true
return true;
} catch (Exception e) {
// 失败扣除库存,返回 false
return false;
}
}
}
package com.et.seata.product.service;
import com.et.seata.product.dao.ProductDao;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductDao productDao;
@Override
@Transactional // <1> 开启新事物
public void reduceStock(Long productId, Integer amount) throws Exception {
log.info("[reduceStock] 当前 XID: {}", RootContext.getXID());
// <2> 检查库存
checkStock(productId, amount);
log.info("[reduceStock] 开始扣减 {} 库存", productId);
// <3> 扣减库存
int updateCount = productDao.reduceStock(productId, amount);
// 扣除成功
if (updateCount == 0) {
log.warn("[reduceStock] 扣除 {} 库存失败", productId);
throw new Exception("库存不足");
}
// 扣除失败
log.info("[reduceStock] 扣除 {} 库存成功", productId);
}
private void checkStock(Long productId, Integer requiredAmount) throws Exception {
log.info("[checkStock] 检查 {} 库存", productId);
Integer stock = productDao.getStock(productId);
if (stock < requiredAmount) {
log.warn("[checkStock] {} 库存不足,当前库存: {}", productId, stock);
throw new Exception("库存不足");
}
}
}
package com.et.seata.product.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Mapper
@Repository
public interface ProductDao {
/**
* 获取库存
*
* @param productId 商品编号
* @return 库存
*/
@Select("SELECT stock FROM product WHERE id = #{productId}")
Integer getStock(@Param("productId") Long productId);
/**
* 扣减库存
*
* @param productId 商品编号
* @param amount 扣减数量
* @return 影响记录行数
*/
@Update("UPDATE product SET stock = stock - #{amount} WHERE id = #{productId} AND stock >= #{amount}")
int reduceStock(@Param("productId") Long productId, @Param("amount") Integer amount);
}
package com.et.seata.balance.controller;
import com.et.seata.balance.dto.AccountReduceBalanceDTO;
import com.et.seata.balance.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@Slf4j
public class AccountController {
@Autowired
private AccountService accountService;
@PostMapping("/balance")
public Boolean reduceBalance(@RequestBody AccountReduceBalanceDTO accountReduceBalanceDTO) {
log.info("[reduceBalance] 收到减少余额请求, 用户:{}, 金额:{}", accountReduceBalanceDTO.getUserId(),
accountReduceBalanceDTO.getPrice());
try {
accountService.reduceBalance(accountReduceBalanceDTO.getUserId(), accountReduceBalanceDTO.getPrice());
// 正常扣除余额,返回 true
return true;
} catch (Exception e) {
// 失败扣除余额,返回 false
return false;
}
}
}
package com.et.seata.balance.service;
import com.et.seata.balance.dao.AccountDao;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountDao accountDao;
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW) // <1> 开启新事物
public void reduceBalance(Long userId, Integer price) throws Exception {
log.info("[reduceBalance] 当前 XID: {}", RootContext.getXID());
// <2> 检查余额
checkBalance(userId, price);
log.info("[reduceBalance] 开始扣减用户 {} 余额", userId);
// <3> 扣除余额
int updateCount = accountDao.reduceBalance(price);
// 扣除成功
if (updateCount == 0) {
log.warn("[reduceBalance] 扣除用户 {} 余额失败", userId);
throw new Exception("余额不足");
}
log.info("[reduceBalance] 扣除用户 {} 余额成功", userId);
}
private void checkBalance(Long userId, Integer price) throws Exception {
log.info("[checkBalance] 检查用户 {} 余额", userId);
Integer balance = accountDao.getBalance(userId);
if (balance < price) {
log.warn("[checkBalance] 用户 {} 余额不足,当前余额:{}", userId, balance);
throw new Exception("余额不足");
}
}
}
package com.et.seata.balance.dao;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Mapper
@Repository
public interface AccountDao {
/**
* 获取账户余额
*
* @param userId 用户 ID
* @return 账户余额
*/
@Select("SELECT balance FROM account WHERE id = #{userId}")
Integer getBalance(@Param("userId") Long userId);
/**
* 扣减余额
*
* @param price 需要扣减的数目
* @return 影响记录行数
*/
@Update("UPDATE account SET balance = balance - #{price} WHERE id = 1 AND balance >= ${price}")
int reduceBalance(@Param("price") Integer price);
}
2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=172.22.0.3:8091:27573281007513609,branchId=27573281007513610,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/seata_storage,applicationData=null
2024-08-08 22:00:59.467 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.22.0.3:8091:27573281007513609 27573281007513610 jdbc:mysql://127.0.0.1:3306/seata_storage
2024-08-08 22:00:59.503 INFO 35051 --- [tch_RMROLE_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.22.0.3:8091:27573281007513609 branch 27573281007513610, undo_log deleted with GlobalFinished
2024-08-08 22:00:59.511 INFO 35051 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked