关于springboot响应式编程整合webFlux的问题
在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式,另一种是基于SpringMVC注解方式,今天通过本文给大家介绍springboot响应式编程整合webFlux的问题,感兴趣的朋友一起看看吧
在servlet3.0标准之前,是每一个请求对应一个线程。如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况,因为一旦要调用第三方接口,就有可能出现这样的操作了。早期的处理方式只能是手工控制线程。
在servlet3.0标准之后,为了解决此类问题,所以提供了异步响应的支持。在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他用户使用,这样的操作机制将极大的提升程序的并发性能。
对于以上给出的响应式编程支持,仅仅是一些原生的支持模式,而现在既然基于springboot程序开发,那么就需要考虑一些更简单的整合。
而在spring中实现响应式编程,那么则需要使用到spring webFlux,该组件是一个重新构建的且基于Reactive Streams标准实现的异步非阻塞Web开发框架,以Reactor开发框架为基础,可以更加容易实现高并发访问下的请求处理模型。在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式,另一种是基于SpringMVC注解方式。
Maven引入
1 2 3 4 | <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> |
整合处理器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | package com.example.oldguy.myWebFlux.handler; import com.example.oldguy.myVo.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @Component @Slf4j public class MessageHandler { public Mono<Message> echoHandler(Message message){ log.info( "【{}】业务层接收处理数据:{}" ,Thread.currentThread().getName()); message.setTitle( "【】" +Thread.currentThread().getName()+ "】" +message.getTitle()); message.setContent( "【】" +Thread.currentThread().getName()+ "】" +message.getContent()); return Mono.create(item->item.success(message)); //实现数据响应 } } |
整合控制器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | package com.example.oldguy.myController; import com.example.oldguy.myVo.Message; import com.example.oldguy.myWebFlux.handler.MessageHandler; import com.example.oldguy.mytask.MyThreadTask; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.web.bind.WebDataBinder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.InitBinder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import org.springframework.web.context.request.async.DeferredResult; import javax.servlet.http.HttpServletRequest; import java.beans.PropertyEditorSupport; import java.time.Instant; import java.time.LocalDate; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 异步线程的处理机制 */ @RestController @RequestMapping ( "/message/*" ) @Slf4j @Api (tags = "异步处理" ) public class AsyncController { @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; private MyThreadTask task; private MessageHandler messageHandler; /** * 日期转换 * @param * @return */ private static final DateTimeFormatter LOCAL_DATE_FORMAT = DateTimeFormatter.ofPattern( "yyyy-MM-dd" ); @InitBinder public void initBinder(WebDataBinder binder){ binder.registerCustomEditor(Date. class , new PropertyEditorSupport(){ @Override public void setAsText(String text) throws IllegalArgumentException { LocalDate localDate = LocalDate.parse(text,LOCAL_DATE_FORMAT); Instant instant = localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant(); super .setValue(Date.from(instant)); } }); } @GetMapping ( "runnable" ) @ApiOperation ( "异常处理Runnable" ) public Object message(String message) { log.info( "外部线程:{}" , Thread.currentThread().getName()); HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); DeferredResult<String> result = new DeferredResult<>(6000L); //设置异步响应 this .threadPoolTaskExecutor.execute( new Runnable() { //线程核心任务 @SneakyThrows public void run() { log.info( "内部线程:{}" ,Thread.currentThread().getName()); TimeUnit.SECONDS.sleep( 7 ); result.setResult( "[echo]" +message); //执行最终的响应 result.onCompletion( new Runnable() { //完成处理线程 log.info( "完成线程:{}" ,Thread.currentThread().getName()); //日志输出 result.onTimeout( new Runnable() { log.info( "超时线程:{}" ,Thread.currentThread().getName()); result.setResult( "【请求超时】" +request.getRequestURI()); //超时路径 return result; @GetMapping ( "task" ) @ApiOperation ( "task异步任务开启" ) public Object messageTask(String message){ log.info( "外部线程{}" ,Thread.currentThread().getName()); this .task.startTaskHander(); return "【echo】" +message; @GetMapping ( "webflux" ) @ApiOperation ( "整合webflux" ) public Object echo(Message message){ log.info( "接收用户信息,用户方发送的参数为message={}" ,message); return this .messageHandler.echoHandler(message); } |
页面响应:
控制台响应:
2021-11-30 15:04:06.946 INFO 22884 --- [nio-1999-exec-1] c.e.oldguy.myController.AsyncController : 接收用户信息,用户方发送的参数为message=Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)
2021-11-30 15:04:06.947 INFO 22884 --- [nio-1999-exec-1] c.e.o.myWebFlux.handler.MessageHandler : 【http-nio-1999-exec-1】业务层接收处理数据:Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)
webFlux响应map和List
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | //webFlux响应集合 public Flux<Message> list(Message message){ List<Message> messageList = new ArrayList<>(); for ( int i= 0 ;i< 10 ;i++){ Message m = new Message(); m.setTitle(i+ "--" +message.getTitle()); m.setContent(i+ "--" +message.getContent()); m.setPubdate(message.getPubdate()); messageList.add(m); } return Flux.fromIterable(messageList); } public Flux<Map.Entry<String,Message>> map(Message message){ Map<String,Message> map = new HashMap<>(); for ( int i= 0 ;i< 10 ;i++){ Message m = new Message(); m.setTitle(i+ "--" +message.getTitle()); m.setContent(i+ "--" +message.getContent()); m.setPubdate(message.getPubdate()); map.put( "pansd-" +i,m); } // Set<Map.Entry<String, Message>> entries = map.entrySet(); return Flux.fromIterable(map.entrySet()); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 | @GetMapping ( "webfluxList" ) @ApiOperation ( "整合webfluxList" ) public Object echoList(Message message){ log.info( "接收用户信息,用户方发送的参数为message={}" ,message); return this .messageHandler.list(message); } @GetMapping ( "webfluxMap" ) @ApiOperation ( "整合webfluxMap" ) public Object echoMap(Message message){ log.info( "接收用户信息,用户方发送的参数为message={}" ,message); return this .messageHandler.map(message); } |
到此这篇关于springboot响应式编程整合webFlux的文章就介绍到这了
原文链接:https://blog.csdn.net/pshdhx/article/details/121636277