响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式,直白的说就是:将变化的值通过数据流进行传播。
WebFlux 模块的名称是 spring-webflux,名称中的 Flux 来源于 Reactor 中的类 Flux。Spring webflux 有一个全新的非堵塞的函数式 Reactive Web 框架,可以用来构建异步的、非堵塞的、事件驱动的服务,在伸缩性方面表现非常好。
spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对 REST,HTML 和 WebSocket 交互等程序的支持。一般来说,Spring MVC 用于同步处理,Spring Webflux 用于异步处理。
Spring Boot Webflux 有两种编程模型实现,一种类似 Spring MVC 注解方式,另一种是基于 Reactor 的响应式方式。
响应式关系数据库连接的支持
之前我有对非关系性数据库的响应式api实现(传送门),而今天我们要说说非关系型数据库实现。
体验spring boot 2.3.0.RC1 , 期待已久mysql reactive 总算开始展露头角了。 今天就来整合一下springboot 和r2dbc 与mysql。主要是采用 Spring Data R2DBC ,Spring Data R2DBC对R2DBC应用了熟悉的Spring抽象和存储库支持。它使在响应式应用程序堆栈中构建使用关系数据访问技术的Spring驱动的应用程序更加容易。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.gitee.occo</groupId> <artifactId>occo-parent</artifactId> <version>2.0.0-SNAPSHOT</version> </parent> <artifactId>occo-sso</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency> <dependency> <groupId>dev.miku</groupId> <artifactId>r2dbc-mysql</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-release-plugin</artifactId> <configuration> <tagNameFormat>v@{project.version}</tagNameFormat> <autoVersionSubmodules>true</autoVersionSubmodules> <indentSize>4</indentSize> <useEditMode>true</useEditMode> <localCheckout>true</localCheckout> </configuration> </plugin> </plugins> </build> </project>
POM中的occo-parent可以替换成
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RC1</version> <relativePath/> <!-- lookup parent from repository --> </parent> <!--下不了jar加上下面的:--> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> </pluginRepository> </pluginRepositories>
server: port: 8000 spring: application: name: occo-sso r2dbc: url: r2dbc:mysql://localhost:3306/occo username: root password: pwd2020 logging: level: org.springframework.data.r2dbc: DEBUG
repository:
import org.springframework.data.r2dbc.repository.Query; import org.springframework.data.repository.reactive.ReactiveCrudRepository; import org.springframework.stereotype.Repository; import com.gitee.occo.sso.user.entity.User; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Repository public interface UserRepository extends ReactiveCrudRepository<User, Long>{ @Query("SELECT * FROM user WHERE state = :state") Flux<User> findByState(Integer state); @Query("SELECT * FROM user WHERE username = :username") Mono<User> findByUsername(String username); @Query("SELECT * FROM user WHERE username LIKE :username") Flux<User> findByUsernameLike(String username); }
service
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.gitee.occo.sso.user.entity.User; import com.gitee.occo.sso.user.repository.UserRepository; import com.gitee.occo.sso.user.service.UserService; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Service public class UserServiceImpl implements UserService { @Autowired private UserRepository userRepository; @Override public Mono<User> findById(Long id) { // TODO Auto-generated method stub return userRepository.findById(id); } @Override public Mono<User> findByUsername(String username) { // TODO Auto-generated method stub return userRepository.findByUsername(username); } @Override public Flux<User> findByUsernameLike(String username) { // TODO Auto-generated method stub return userRepository.findByUsernameLike(username); } }
controller:
import java.time.Duration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.gitee.occo.sso.user.entity.User; import com.gitee.occo.sso.user.service.UserService; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController @RequestMapping("/user") public class UserController { @Autowired UserService userService; @GetMapping("/findById/{id}") public Mono<User> getUserById(@PathVariable("id")Long id){ return userService.findById(id); } @GetMapping("/findByUsername") public Mono<User> findByUsername(String username){ return userService.findByUsername(username); } @GetMapping(value="/findByUsernameLike" ,produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<User> getByUsername(String username){ return userService.findByUsernameLike(username+"%").delayElements(Duration.ofSeconds(2)); } }
PS:produces = MediaType.APPLICATION_STREAM_JSON_VALUE 要加上,不然就会等待完成,一个请求体返回回去。
代码: 传送门