WebFlux踩坑实录
WebFlux踩坑实录
从WebMVC升级到WebFlux之后有一些东西都不能按照之前的思维来做
如何从MVC到Flux
去除
| <dependency> | 
添加
| <dependency> | 
转到Flux之后你需要知道的事情
- 
响应式编程,需要一个订阅,你的流才能流动。 
- 
Flux响应式编程,可以理解为全走了异步线程池,所以你的代码需要返回一个Mono或者Flux。 
- 
什么是Mono,什么是Flux。Mono是返回0-1个元素的数据,Flux是会返回0-N个元素的数据。 
- 
TherdLocal不能使用,需要使用到Context上下文来进行传递参数。 - 可以使用Mono.deferContextual(ctx -> Mono.just(ctx.get("")));来获取到你存在上下文里面的值
- 可以使用Mono.just(需要处理的内容).contextWrite(Context.of("key", val))来设置上下文
 
- 可以使用
- 
不存在拦截器,只有过滤器WebFliter public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) 
 // ServerWebExchange包含请求和响应
 // WebFilterChain是请求链
 // 可以使用 exchange.getSession() 来获取session
 // 可以使用 chain.filter(exchange) 继续执行
- 
全局异常处理器和传统的MVC使用形式相同 
- 
block是将Mono 变为T的函数,但是在某些线程内不允许我们这样做,这样是阻塞式编程,你可以使用 Mono.fromCallable(() -> 需要阻塞的方法 
 .contextWrite().block()) // 可选,写入上下文,对应4
 .subscribeOn(Schedulers.boundedElastic()).block();对于同步阻塞的调用,如果直接调用会阻塞主线程,而在响应式编程中,主线程数量往往等同于cpu核数,一旦阻塞,就会产生问题导致吞吐量极速下降。 
 为此,可以只用上面的方法,把一个同步阻塞的代码(比如一次http请求)封装成一个Mono, 并使用subscribeOn方法用Schedulers.boundedElastic()线程池来实现异步效果。
踩坑点
- 
拦截器的实现 - 
因为2.5,我们知道不存在拦截器,只有过滤器WebFliter,所以登陆鉴权的功能实现需要在这里进行 
- 
实例代码 
- 
import com.baiwang.moirai.utils.ValidatorUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpCookie; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.util.MultiValueMap; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; import reactor.util.context.Context; @Configuration public class LoginFilter implements WebFilter { private final Log logger = LogFactory.getLog(this.getClass()); @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { return exchange.getSession().flatMap(webSession -> { ServerHttpRequest request = exchange.getRequest(); String authToken = null; MultiValueMap<String, HttpCookie> cookies = request.getCookies(); authToken = String.valueOf(cookies.get("TOKEN")); String token_session = (String) webSession.getAttribute("TOKEN"); // 测试免登陆 if ("TOKEN".equals(authToken)) { return chain.filter(exchange); } if (ValidatorUtil.isNull(authToken)) { return Mono.error(new Exception("TOKEN为空,请重新登陆!")); } else { webSession.getAttributes().put("userInfo", userInfo); webSession.getAttributes().put("TOKEN", authToken); return chain.filter(exchange) .contextWrite(Context.of("userInfo", userInfo)); } }); } } <!--code4-->
 
- 
- 
链路追踪(仅仅是Traceid的实现) - 
由于响应式编程的影响ThredLocal不能使用MDC也没有办法正常使用,Spring Sleuth也不再继续维护,所以我们需要换到micrometer 
- 
pom中添加依赖 <dependency> 
 <groupId>io.micrometer</groupId>
 <artifactId>micrometer-tracing</artifactId>
 </dependency>
 <dependency>
 <groupId>io.micrometer</groupId>
 <artifactId>micrometer-observation</artifactId>
 </dependency>
 <dependency>
 <groupId>io.micrometer</groupId>
 <artifactId>micrometer-registry-prometheus</artifactId>
 </dependency>
 <dependency>
 <groupId>io.micrometer</groupId>
 <artifactId>micrometer-tracing-bridge-otel</artifactId>
 </dependency>
 <dependency>
 <groupId>io.micrometer</groupId>
 <artifactId>context-propagation</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-aop</artifactId>
 </dependency>
- 
在Application启动类内添加 
 public class OutputAiApplication {
 public static void main(String[] args) {
 Hooks.enableAutomaticContextPropagation(); // 这段
 SpringApplication.run(OutputAiApplication.class, args);
 }
 }
- 
配置文件中 management: 
 endpoint:
 gateway:
 enabled: true
 endpoints:
 web:
 exposure:
 include: health,gateway,prometheus
 tracing:
 enabled: true
 
- 
- 
OpenFeign的阻塞 当使用OpenFeign作为Grpc调用工具时,响应式编程可能会不允许你使用block方法,这是OpenFeign没有对响应式编程做兼容导致的,我们需要 
 public class BlockingLoadBalancerClientConfig {
 
 LoadBalancerClientFactory loadBalancerClientFactory;
 
 LoadBalancerProperties properties;
 
 public LoadBalancerClient BlockingLoadBalancerClient() {
 return new CustomBlockingLoadBalancerClient(loadBalancerClientFactory, properties);
 }
 }public class CustomBlockingLoadBalancerClient extends BlockingLoadBalancerClient { 
 private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
 public CustomBlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory, LoadBalancerProperties properties) {
 super(loadBalancerClientFactory);
 this.loadBalancerClientFactory = loadBalancerClientFactory;
 }
 
 public <T> ServiceInstance choose(String serviceId, Request<T> request) {
 ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
 if (loadBalancer == null) {
 return null;
 }
 CompletableFuture<Response<ServiceInstance>> f = CompletableFuture.supplyAsync(() -> {
 Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
 return loadBalancerResponse;
 });
 Response<ServiceInstance> loadBalancerResponse = null;
 try {
 loadBalancerResponse = f.get();
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch (Exception e) {
 e.printStackTrace();
 }
 if (loadBalancerResponse == null) {
 return null;
 }
 return loadBalancerResponse.getServer();
 }
 }添加这两个配置就好 
延展阅读
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 喵喵博客!



