阅读 83

SpringCloud系列教程(八)之整合seata分布式事务

什么是seata?

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

关于更多的原理可以参考官方文档,这里就不赘述了:Seata 是什么

下载安装seata-server

下载

下载地址:Releases · seata/seata (github.com)

window下载zip,linux/mac下载tar.gz

注意:如何安装nacos请看这:Nacos 快速入门,建议查看前面的文章以做到丝滑入戏。

安装

解压之后修改配置文件registry.conf(这里主要是配置nacos作为配置中心):

config {   type = "nacos" ## 这里修改为nacos,并且修改下面对应的配置   nacos {     serverAddr = "127.0.0.1:8848"     namespace = "public"     group = "SEATA_GROUP"     username = "nacos"     password = "nacos"   } } 复制代码

然后继续修改注册中心(nacos作为注册中心):

registry {     type = "nacos"   nacos {     application = "seata-server"     serverAddr = "127.0.0.1:8848"     group = "DEFAULT_GROUP" #这里的group需要与业务服务在一个group内     namespace = "public"     cluster = "default"     username = "nacos"     password = "nacos"   } } 复制代码

本文是以nacos作为注册中心和配置中心的,如果需要其它的方式可以查看官方文档。

上传配置

在上传之前先下载对应的配置文件模板

  1. 首先下载config.txt文件:seata/script/config-center at develop · seata/seata (github.com) ,将其放入到seata的解压目录下;见图例1

  2. 然后在目录下找到对应的配置中心的目录下的shell脚本,这里使用的是nacos-config.sh seata/script/config-center/nacos at develop · seata/seata (github.com),将其放入到${SEATA_DIR}/script/config-center/nacos/nacos-config.sh 注意:${SEATA_DIR} 是seata的根目录;见图例2

  3. 到seata的解压根目录下运行 sh script/config-center/nacos/nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -u nacos -w nacos 注意:nacos-config.sh是第2步下载的脚本文件,见图例3

图例1:

图例1

图例2:

图例2

图例3:

图例3

图例4:

图例4

在nacos控制台(localhost:8848/nacos,账密:nacos/nacos):

可以看到配置已经成功上传

具体的配置请查看:seata/config.txt at develop · seata/seata (github.com)

配置参数官方对照表:Seata 参数配置

注意修改对应数据库的配置,可以直接在nacos中修改配置。

有一个比较关键的点,也是很容易出错的点,有一个配置参数单独拿出来讲一下,

service.vgroupMapping.<你的服务名称>-group=default 这个分组需要seata-server和client都保持一致,当然,seata是可以存在多个事务分组的,比如我们订单业务涉及到库存等等逻辑,那么将这些在同一个事务的服务加入到同一个事务分组内,就如默认的配置文件中设置为:service.vgroupMapping.my_test_tx_group=default 那么我们需要在服务的application.yml中配置该组:

seata:   tx-service-group: my_test_tx_group 复制代码

启动seata-server:

windows:打开控制台:./seata-server.bat -h 127.0.0.1

linux/macos: sh [seata-server.sh](http://seata-server.sh) -h 127.0.0.1

成功启动:

成功启动

nacos注册中心:

nacos注册中心

多种部署方式:

  • 使用 Docker 部署 Seata Server

  • 使用 Kubernetes 部署 Seata Server

  • 使用 Helm 部署 Seata Server

  • Seata 高可用部署

新增seata的数据表:

对应的sql文件请查看:seata/script/server/db at 1.4.0 · seata/seata (github.com)

下载之后在数据库中新建库:seata,然后将建库脚本导入。

工程改造

1.复制工程

将工程:spring-cloud-nacos-consumer 复制一份,改为:order-server

将工程:spring-cloud-nacos-provider 复制一份,改为:stock-server

注意:复制之后需要修改部分pom配置才可以(改为对应的名称):

<artifactId>order-server</artifactId>     <version>0.0.1-SNAPSHOT</version>     <name>order-server</name>     <description>Demo project for Spring Boot</description> 复制代码

并且在父pom中加入:

<modules>     ...     <module>stock-server</module>     <module>order-server</module> </modules> 复制代码

然后重新导入依赖即可

如果还存在异常,请将.imi文件删除

2.增加依赖

在两个工程模块的pom中增加依赖:

<!-- seata -->         <dependency>             <groupId>io.seata</groupId>             <artifactId>seata-spring-boot-starter</artifactId>             <version>1.4.2</version>         </dependency>         <dependency>             <groupId>com.alibaba.cloud</groupId>             <artifactId>spring-cloud-starter-alibaba-seata</artifactId>             <exclusions>                 <exclusion>                     <groupId>io.seata</groupId>                     <artifactId>seata-spring-boot-starter</artifactId>                 </exclusion>             </exclusions>         </dependency> 复制代码

3.修改工程的application.yml配置

order-server

logging:   level:     io:       seata: debug seata:   tx-service-group: my_test_tx_group 复制代码

stock-server

logging:   level:     io:       seata: debug seata:   tx-service-group: my_test_tx_group 复制代码

4.数据库初始化

-- 创建 order库、业务表、undo_log表 create database seata_order; use seata_order; DROP TABLE IF EXISTS `order_tbl`; CREATE TABLE `order_tbl` (   `id` int(11) NOT NULL AUTO_INCREMENT,   `user_id` varchar(255) DEFAULT NULL,   `commodity_code` varchar(255) DEFAULT NULL,   `count` int(11) DEFAULT 0,   `money` int(11) DEFAULT 0,   PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `undo_log` (   `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT,   `branch_id`     BIGINT(20)   NOT NULL,   `xid`           VARCHAR(100) NOT NULL,   `context`       VARCHAR(128) NOT NULL,   `rollback_info` LONGBLOB     NOT NULL,   `log_status`    INT(11)      NOT NULL,   `log_created`   DATETIME     NOT NULL,   `log_modified`  DATETIME     NOT NULL,   `ext`           VARCHAR(100) DEFAULT NULL,   PRIMARY KEY (`id`),   UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = InnoDB   AUTO_INCREMENT = 1   DEFAULT CHARSET = utf8; -- 创建 stock库、业务表、undo_log表 create database seata_stock; use seata_stock; DROP TABLE IF EXISTS `stock_tbl`; CREATE TABLE `stock_tbl` (   `id` int(11) NOT NULL AUTO_INCREMENT,   `commodity_code` varchar(255) DEFAULT NULL,   `count` int(11) DEFAULT 0,   PRIMARY KEY (`id`),   UNIQUE KEY (`commodity_code`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `undo_log` (   `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT,   `branch_id`     BIGINT(20)   NOT NULL,   `xid`           VARCHAR(100) NOT NULL,   `context`       VARCHAR(128) NOT NULL,   `rollback_info` LONGBLOB     NOT NULL,   `log_status`    INT(11)      NOT NULL,   `log_created`   DATETIME     NOT NULL,   `log_modified`  DATETIME     NOT NULL,   `ext`           VARCHAR(100) DEFAULT NULL,   PRIMARY KEY (`id`),   UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = InnoDB   AUTO_INCREMENT = 1   DEFAULT CHARSET = utf8; -- 初始化库存模拟数据 INSERT INTO seata_stock.stock_tbl (id, commodity_code, count) VALUES (1, 'product-1', 9999999); INSERT INTO seata_stock.stock_tbl (id, commodity_code, count) VALUES (2, 'product-2', 0); 复制代码

5.引入mybatis plus

父pom引入依赖:

<!-- mybatis plus --> <dependency>     <groupId>com.baomidou</groupId>     <artifactId>mybatis-plus-boot-starter</artifactId>     <version>3.4.3.4</version> </dependency> <!-- 提供mysql驱动 --> <dependency>     <groupId>mysql</groupId>     <artifactId>mysql-connector-java</artifactId>     <version>8.0.16</version> </dependency> 复制代码

两个工程分别引入依赖:

<!-- mybatis plus --> <dependency>     <groupId>com.baomidou</groupId>     <artifactId>mybatis-plus-boot-starter</artifactId> </dependency> <dependency>     <groupId>mysql</groupId>     <artifactId>mysql-connector-java</artifactId> </dependency> 复制代码

6.order-server业务修改

新增部分类

@Data @TableName("order_tbl") @Accessors(chain = true) @Builder public class Order implements Serializable {    private static final longserialVersionUID= 1L;     @JsonFormat(shape = JsonFormat.Shape.STRING)     @TableId(value="id" ,type = IdType.AUTO) /**  */ @TableField("id")     private Integer id; /**  */ @TableField("user_id")     private String userId; /**  */ @TableField("commodity_code")     private String commodityCode; /**  */ @TableField("count")     private Integer count; /**  */ @TableField("money")     private BigDecimal money;     @Tolerate     public Order(){} } 复制代码

@Mapper public interface OrderTblMapper extends BaseMapper<Order> { } 复制代码

mapperxml:

<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.winterchen.nacos.mapper.OrderTblMapper"> </mapper> 复制代码

service:

public interface OrderTblService extends IService<Order> {     void placeOrder(String userId, String commodityCode, Integer count); } 复制代码

@Service public class OrderTblServiceImpl extends ServiceImpl<OrderTblMapper, Order> implements OrderTblService {     @Autowired     private StockFeignClient stockFeignClient;     /**      * 下单:创建订单、减库存,涉及到两个服务      *      * @param userId      * @param commodityCode      * @param count      */     @GlobalTransactional     @Transactional(rollbackFor = Exception.class)     @Override     public void placeOrder(String userId, String commodityCode, Integer count) {         BigDecimal orderMoney = new BigDecimal(count).multiply(new BigDecimal(5));         Order order = new Order().setUserId(userId).setCommodityCode(commodityCode).setCount(count).setMoney(orderMoney);         baseMapper.insert(order);         stockFeignClient.deduct(commodityCode, count);     } } 复制代码

StockFeignClient

@FeignClient(name = "stock-server") public interface StockFeignClient {     @PostMapping("/api/stock/deduct")     Boolean deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count); } 复制代码

@Api(tags="订单API") @RestController @RequestMapping("/api/order") public class OrderTblController {     @Autowired     private OrderTblService orderTblService;     /**      * 下单:插入订单表、扣减库存,模拟回滚      *      * @return      */     @PostMapping("/placeOrder/commit")     public Boolean placeOrderCommit() {         orderTblService.placeOrder("1", "product-1", 1);         return true;     }     /**      * 下单:插入订单表、扣减库存,模拟回滚      *      * @return      */     @PostMapping("/placeOrder/rollback")     public Boolean placeOrderRollback() {         // product-2 扣库存时模拟了一个业务异常,         orderTblService.placeOrder("1", "product-2", 1);         return true;     }     @PostMapping("/placeOrder")     public Boolean placeOrder(String userId, String commodityCode, Integer count) {         orderTblService.placeOrder(userId, commodityCode, count);         return true;     } } 复制代码

7.stock-server业务修改

entity:

@Data @TableName("stock_tbl") @Accessors(chain = true) @Builder public class Stock implements Serializable { private static final long serialVersionUID = 1L;     @JsonFormat(shape = JsonFormat.Shape.STRING)     @TableId(value="id" ,type = IdType.AUTO)     /**  */     @TableField("id")     private Integer id;     /**  */     @TableField("commodity_code")     private String commodityCode;     /**  */     @TableField("count")     private Integer count;     @Tolerate     public Stock(){} } 复制代码

mapper:

@Mapper public interface StockTblMapper extends BaseMapper<Stock> { } 复制代码

mapperxml:

<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.winterchen.nacos.mapper.StockTblMapper"> </mapper> 复制代码

service:

public interface StockTblService extends IService<Stock> {     void deduct(String commodityCode, int count); } 复制代码

@Service public class StockTblServiceImpl extends ServiceImpl<StockTblMapper, Stock> implements StockTblService {     @Transactional(rollbackFor = Exception.class)     @Override     public void deduct(String commodityCode, int count) {         if (commodityCode.equals("product-2")) {             throw new RuntimeException("异常:模拟业务异常:stock branch exception");         }         QueryWrapper<Stock> wrapper = new QueryWrapper<>();         wrapper.setEntity(new Stock().setCommodityCode(commodityCode));         Stock stock = baseMapper.selectOne(wrapper);         stock.setCount(stock.getCount() - count);         baseMapper.updateById(stock);     } } 复制代码

controller:

@Api(tags="库存API") @RestController @RequestMapping("/api/stock") public class StockTblController {     @Autowired     private StockTblService stockTblService;     /**      * 减库存      *      * @param commodityCode 商品代码      * @param count         数量      * @return      */     @PostMapping(path = "/deduct")     public Boolean deduct(String commodityCode, Integer count) {         stockTblService.deduct(commodityCode, count);         return true;     } } 复制代码

8. 修改gateway:

修改GatewayConfiguration

initCustomizedApis 新增对order-serverstock-server的初始化

ApiDefinition api3 = new ApiDefinition("order")                 .setPredicateItems(new HashSet<ApiPredicateItem>() {{                     add(new ApiPathPredicateItem().setPattern("/order/**")                             .setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));                 }});         ApiDefinition api4 = new ApiDefinition("stock")                 .setPredicateItems(new HashSet<ApiPredicateItem>() {{                     add(new ApiPathPredicateItem().setPattern("/stock/**")                             .setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));                 }});   definitions.add(api3);   definitions.add(api4); 复制代码

initGatewayRules 新增规则

rules.add(new GatewayFlowRule("order")                 .setCount(10)                 .setIntervalSec(1)         );         rules.add(new GatewayFlowRule("order")                 .setCount(2)                 .setIntervalSec(2)                 .setBurst(2)                 .setParamItem(new GatewayParamFlowItem()                         .setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_CLIENT_IP)                 )         ); rules.add(new GatewayFlowRule("stock")                 .setCount(10)                 .setIntervalSec(1)                 .setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)                 .setMaxQueueingTimeoutMs(600)                 .setParamItem(new GatewayParamFlowItem()                         .setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_HEADER)                         .setFieldName("X-Sentinel-Flag")                 )         );         rules.add(new GatewayFlowRule("stock")                 .setCount(1)                 .setIntervalSec(1)                 .setParamItem(new GatewayParamFlowItem()                         .setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)                         .setFieldName("pa")                 )         );         rules.add(new GatewayFlowRule("stock")                 .setCount(2)                 .setIntervalSec(30)                 .setParamItem(new GatewayParamFlowItem()                         .setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)                         .setFieldName("type")                         .setPattern("warn")                         .setMatchStrategy(SentinelGatewayConstants.PARAM_MATCH_STRATEGY_CONTAINS)                 )         );         rules.add(new GatewayFlowRule("stock")                 .setResourceMode(SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME)                 .setCount(5)                 .setIntervalSec(1)                 .setParamItem(new GatewayParamFlowItem()                         .setParseStrategy(SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM)                         .setFieldName("pn")                 )         ); 复制代码

appilcation.yml新增对order-serverstock-server的路由配置:

 spring:   cloud:     nacos:       discovery:         server-addr: 127.0.0.1:8848     gateway:       discovery:         locator:           enabled: false           lowerCaseServiceId: true       routes:         - id: provider           uri: lb://winter-nacos-provider           predicates:             - Path=/provider/**           filters:             - StripPrefix=1 #StripPrefix=1就代表截取路径的个数为1,比如前端过来请求/test/good/1/view,匹配成功后,路由到后端的请求路径就会变成http://localhost:8888/good/1/view         - id: consumer           uri: lb://winter-nacos-consumer           predicates:             - Path=/consumer/**           filters:             - StripPrefix=1         - id: auth           uri: lb://auth           predicates:             - Path=/auth/**           filters:             - StripPrefix=1         - id: order-server   -------新增order-server的路由规则           uri: lb://order-server           predicates:             - Path=/order/**           filters:             - StripPrefix=1         - id: stock-server  --------- 新增stock-server的路由规则           uri: lb://stock-server           predicates:             - Path=/stock/**           filters:             - StripPrefix=1 复制代码

测试:

首先启动对应的服务:

  • spring-cloud-gateway

  • spring-cloud-auth

  • order-server

  • stock-server

然后打开swagger进行测试: consumer服务

测试提交:

订单创建成功:

库存扣减成功:

测试回滚:

此时数据库里面都正常回滚。

总结

以上就是本教程的全部内容了,seata是一款非常好用的分布式事务框架,为开发人员提供了比较简单的API,seata默认使用的是AT模式的事务,当然,可以结合自身的业务选择比较合适的分布式事务模式,具体的配置可以参考官方文档。


作者:WinterChenS
链接:https://juejin.cn/post/7028398573817430024


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐