转载

分布式事务之Spring事务与JMS事务(二)

分布式事务之Spring事务与JMS事务(二)

Spring事务

Spring事务机制主要包括声明式事务和编程式事务,声明式事务让我们从复杂的事务处理中得到解脱,编程式事务在实际开发中得不到广泛使用,仅供学习参考。

事务抽象

spring的事务管理提供了统一的API接口支持不同的资源, 提供声明式事务 管企且方便与Spring框架集成。spring的事务管理器使用抽象的设计方式实现,以下为spring中事务管理器的逻辑实现代码 (精简了一部分,突出核心逻辑)

## 事务状态
public interface TransactionStatus extends SavepointManager{
    boolean isNewTransaction();
    boolean hasSavepoint();
    void setRollbackOnly();
    boolean isRollbackOnly();
    boolean isCompleted();
}
## 事务定义
public interface TransactionDefinition{
    int getPropagationBehavior();
    int getIsolationLevel();
    String getName();
    int getTimeout();
    boolean isReadOnly();
}
## 事务管理器
public interface PlatformTransactionManager{
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    void commit(TransactionStatus status) throws TransactionException;
    void rollback(TransactionStatus status) throws TransactionException;
}
复制代码

事务的传播行为

Spring在TransactionDefinition接口中规定了 7 种类型的事务传播行为,它们规定了事务方法和事务方法发生嵌套调用时事务如何进行传播:

事务传播行为类型:

事务传播行为类型 说明
PROPAGATION_REQUIRED 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择。
PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY 使用当前的事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

当使用PROPAGATION_NESTED时,底层的数据源必须基于JDBC 3.0,并且实现者需要支持保存点事务机制。

事务隔离级别

spring如果没有指定事务隔离级别的话,则spring的事务隔离级别跟数据库的隔离级别走,数据库是什么隔离级别,spring就是什么隔离级别。

Spring事务的隔离级别:

  1. ISOLATION_DEFAULT: 这是一个PlatfromTransactionManager默认的隔离级别,使用数据库默认的事务隔离级别. 另外四个与JDBC的隔离级别相对应
  2. ISOLATION_READ_UNCOMMITTED: 这是事务最低的隔离级别,它充许令外一个事务可以看到这个事务未提交的数据。 这种隔离级别会产生脏读,不可重复读和幻像读。
  3. ISOLATION_READ_COMMITTED: 保证一个事务修改的数据提交后才能被另外一个事务读取。另外一个事务不能读取该事务未提交的数据
  4. ISOLATION_REPEATABLE_READ: 这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻像读。 它除了保证一个事务不能读取另一个事务未提交的数据外,还保证了避免下面的情况产生(不可重复读)。
  5. ISOLATION_SERIALIZABLE: 这是花费最高代价但是最可靠的事务隔离级别。事务被处理为顺序执行。 除了防止脏读,不可重复读外,还避免了幻像读。

接下来看一下代码方式与标签方式的事务实现:

## 代码方式实现
public OrderService{
    @Autowire 
    PlatformTransactionManager txManager;
    void buyTicket(BuyTicketDTO dto){
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = txManager.getTransaction(def);
        try{
            ## 执行业务代码
            txManager.commit(status);
        }catch(Exception e){
            txManager.rollback(status);
        }
    }
}

## Transaction标签实现方式
public OrderService{
    @Transactonal
    void buyTick(BuyTicketDTO dto){
        // get and begin transaction manager from context
        try{
            /**业务代码*/
            // commit transaction
        }catch(Exception e){
            // rollback transaction
        }
    }
}
复制代码
分布式事务之Spring事务与JMS事务(二)

策略接口 PlatformTransactionManager 事务管理器的常见实现有:

DataSourceTransactionManager、JpTransactionManager、JmsTransactionManager、JtaTransactionManager

JPA简介及事务实现

JPA是Java的一个规范(Java持久性API)。 它用于在Java对象和关系数据库之间保存数据。 JPA充当面向对象的领域模型和关系数据库系统之间的桥梁。 由于JPA只是一个规范,它本身不执行任何操作。 它需要一个实现。 因此,像Hibernate,TopLink和iBatis这样的ORM工具实现了JPA数据持久性规范。

关于JPA事务实例的代码:

domian实体对象

@Entity(name = "customer")
public class Customer {
    ## id 自增长
    @Id
    @GeneratedValue
    private Long id;
    ## 唯一索引
    @Column(name = "user_name", unique = true)
    private String username;
    private String password;
    private String role;

    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public String getRole() {
        return role;
    }
    public void setRole(String role) {
        this.role = role;
    }
}
复制代码

dao 接口

// 继成JpaRepository中的方法,其中已经包含基本的CRUD
public interface CustomerRepository extends JpaRepository<Customer, Long> {
    Customer findOneByUsername(String username);
}
复制代码

service 业务操作,以下2种事务实现的效果是一样的,意在告诉大家如何使用代码的方式实现与注解声明事务相同的效果。

@Service
public class CustomerServiceTxInAnnotation {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class);
    
    @Autowired
    private CustomerRepository customerRepository;
    ## 使用注解的方式声明事务存在
    @Transactional
    public Customer create(Customer customer) {
        LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername());
        if (customer.getId() != null) {
            throw new RuntimeException("用户已经存在");
        }
        customer.setUsername("Annotation:" + customer.getUsername());
        return customerRepository.save(customer);
    }
}
复制代码
@Service
public class CustomerServiceTxInCode {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class);

    @Autowired
    private CustomerRepository customerRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;

    public Customer create(Customer customer) {
        LOG.info("CustomerService In Code create customer:{}", customer.getUsername());
        if (customer.getId() != null) {
            throw new RuntimeException("用户已经存在");
        }
        ## 使用代码的方式来声明事务存在
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
        ## 当使用ISOLATION_SERIALIZABLE级别时,如果外部没事务存在,则本身创建事务,,所以submitError方法抛出异常可以回滚
        //def.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
        ## 当使用PROPAGATION_REQUIRED级别时,如果外部没事务存在,则本身也不存在事务,,所以submitError方法抛出异常依然可以保存数据成功 
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            customer.setUsername("Code:" + customer.getUsername());
            customerRepository.save(customer);
            submitError();
            transactionManager.commit(status);
            return customer;
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
    private void submitError(){
        throw new RuntimeException("some Data error ")
    }
}
复制代码

controller 控制层

@RestController
@RequestMapping("/api/customer")
public class CustomerResource {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerResource.class);

    @Autowired
    private CustomerServiceTxInAnnotation customerService;
    @Autowired
    private CustomerServiceTxInCode customerServiceInCode;
    @Autowired
    private CustomerRepository customerRepository;

    @PostMapping("/annotation")
    public Customer createInAnnotation(@RequestBody Customer customer) {
        LOG.info("CustomerResource create in annotation create customer:{}", customer.getUsername());
        return customerService.create(customer);
    }
    @PostMapping("/code")
    public Customer createInCode(@RequestBody Customer customer) {
        LOG.info("CustomerResource create in code create customer:{}", customer.getUsername());
        return customerServiceInCode.create(customer);
    }
    @GetMapping("")
    public List<Customer> getAll() {
        return customerRepository.findAll();
    }
}
复制代码

接下来看一下程序的执行结果及JPA事务的管理过程:

分布式事务之Spring事务与JMS事务(二)

在整个事务管理过程中使用的是Spring事务控制,并且由相关ORM框架实现JPA规范

JMS事务原理

Spring JMS Session

  • 通过Session进行事务管理操作
  • Session 是一个thread-bound(线程范围内)
  • 事务上下文:一个线程一个Session

Spring JMS事务类型

  • Session管理的事务-原生事务
  • 外部管理的事务-JmsTransactionManager、JTA

Srping JMS事务机制过程

Session原生事务:

分布式事务之Spring事务与JMS事务(二)

JmsTransactionManager事务:

分布式事务之Spring事务与JMS事务(二)
##注解方式注入Spring Bean
@EnableJms
@Configuration
public class JmsConfig {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);

    @Bean
    public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) {
        LOG.debug("init jms template with converter.");
        JmsTemplate template = new JmsTemplate();
        ## JmsTemplate使用的connectionFactory跟JmsTransactionManager使用的必须是同一个,不能在这里封装成caching之类的。
        template.setConnectionFactory(connectionFactory); 
        return template;
    }

    ## 这个用于设置 @JmsListener使用的containerFactory
    @Bean
    public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory connectionFactory,
                                                     DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                     PlatformTransactionManager transactionManager) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setTransactionManager(transactionManager);
        factory.setCacheLevelName("CACHE_CONNECTION");
        factory.setReceiveTimeout(10000L);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
}

复制代码
@Service
public class CustomerService {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);

    @Autowired
    JmsTemplate jmsTemplate;
    @Autowired
    private PlatformTransactionManager transactionManager;

    @PostConstruct
    public void init() {
        jmsTemplate.setReceiveTimeout(3000);
    }
    ## 原生事务
    @JmsListener(destination = "customer:msg:new", containerFactory = "msgFactory")
    public void handle(String msg) {
        LOG.debug("Get JMS message to from customer:{}", msg);
        String reply = "Replied - " + msg;
        jmsTemplate.convertAndSend("customer:msg:reply", reply);
        if (msg.contains("error")) {
            simulateError();
        }
    }
    ## JmsTransactionManager事务
    @JmsListener(destination = "customer:msg2:new", containerFactory = "msgFactory")
    public void handle2(String msg) {
        LOG.debug("Get JMS message2 to from customer:{}", msg);
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            String reply = "Replied-2 - " + msg;
            jmsTemplate.convertAndSend("customer:msg:reply", reply);
            if (!msg.contains("error")) {
                transactionManager.commit(status);
            } else {
                transactionManager.rollback(status);
            }
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }

    private void simulateError() {
        throw new RuntimeException("some Data error.");
    }
}
复制代码

Spring 本地事务

紧密依赖于底层资源管理器(例如数据库连接 ),事务处理局限在当前事务资源内。此种事务处理方式不存在对应用服务器的依赖,因而部署灵活却无法支持多数据源的分布式事务。

  • Spring容器管理事务的生命周期
  • 通过Spring事务接口调用
  • 业务代码与具体事务的实现无关

在数据库连接中使用本地事务示例如下:

public void transferAccount() { 
       Connection conn = null; 
       Statement stmt = null; 
       try{ 
           conn = getDataSource().getConnection(); 
           ## 将自动提交设置为 false,
           ## 若设置为 true 则数据库将会把每一次数据更新认定为一个事务并自动提交
           conn.setAutoCommit(false);
           stmt = conn.createStatement(); 
           ## 将 A 账户中的金额减少 500 
           stmt.execute("/
           update t_account set amount = amount - 500 where account_id = 'A'");
           ## 将 B 账户中的金额增加 500 
           stmt.execute("/
           update t_account set amount = amount + 500 where account_id = 'B'");
           ## 提交事务
           conn.commit();
           ## 事务提交:转账的两步操作同时成功
       } catch(SQLException sqle){            
           try{ 
               ## 发生异常,回滚在本事务中的操做
              conn.rollback();
               ## 事务回滚:转账的两步操作完全撤销
               stmt.close(); 
               conn.close(); 
           }catch(Exception ignore){ } 
           sqle.printStackTrace(); 
       } 
   }
复制代码

本地事务机制过程图:

分布式事务之Spring事务与JMS事务(二)

Spring 外部(全局)事务

  • 外部事务管理器提供事务管理
  • 通过Spring事务接口,调用外部管理器
  • 使用JNDI等方式获取外部事务管理器的实例
  • 外部事务管理器一般由应用服务器提供、如JBoss等

JNDI(Java Naming and Directory Interface,Java命名和目录接口)是SUN公司提供的一种标准的Java命名系统接口,JNDI提供统一的客户端API,通过不同的访问提供者接口JNDI服务供应接口(SPI)的实现,由管理者将JNDI API映射为特定的命名服务和目录系统,使得Java应用程序可以和这些命名服务和目录服务之间进行交互。

原文  https://juejin.im/post/5b839a5f6fb9a019d20e1dda
正文到此结束
Loading...