WebFlux踩坑实录

从WebMVC升级到WebFlux之后有一些东西都不能按照之前的思维来做

如何从MVC到Flux

去除

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

添加

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

转到Flux之后你需要知道的事情

  1. 响应式编程,需要一个订阅,你的流才能流动。

  2. Flux响应式编程,可以理解为全走了异步线程池,所以你的代码需要返回一个Mono或者Flux。

  3. 什么是Mono,什么是Flux。Mono是返回0-1个元素的数据,Flux是会返回0-N个元素的数据。

  4. TherdLocal不能使用,需要使用到Context上下文来进行传递参数。

    • 可以使用Mono.deferContextual(ctx -> Mono.just(ctx.get("")));来获取到你存在上下文里面的值
    • 可以使用Mono.just(需要处理的内容).contextWrite(Context.of("key", val))来设置上下文
  5. 不存在拦截器,只有过滤器WebFliter

    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain)
    // ServerWebExchange包含请求和响应
    // WebFilterChain是请求链
    // 可以使用 exchange.getSession() 来获取session
    // 可以使用 chain.filter(exchange) 继续执行
  6. 全局异常处理器和传统的MVC使用形式相同

  7. block是将Mono变为T的函数,但是在某些线程内不允许我们这样做,这样是阻塞式编程,你可以使用

    Mono.fromCallable(() -> 需要阻塞的方法
    .contextWrite().block()) // 可选,写入上下文,对应4
    .subscribeOn(Schedulers.boundedElastic()).block();

    对于同步阻塞的调用,如果直接调用会阻塞主线程,而在响应式编程中,主线程数量往往等同于cpu核数,一旦阻塞,就会产生问题导致吞吐量极速下降。
    为此,可以只用上面的方法,把一个同步阻塞的代码(比如一次http请求)封装成一个Mono, 并使用subscribeOn方法用Schedulers.boundedElastic()线程池来实现异步效果。

踩坑点

  1. 拦截器的实现

    • 因为2.5,我们知道不存在拦截器,只有过滤器WebFliter,所以登陆鉴权的功能实现需要在这里进行

    • 实例代码

    • import com.baiwang.moirai.utils.ValidatorUtil;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.http.HttpCookie;
      import org.springframework.http.server.reactive.ServerHttpRequest;
      import org.springframework.util.MultiValueMap;
      import org.springframework.web.server.ServerWebExchange;
      import org.springframework.web.server.WebFilter;
      import org.springframework.web.server.WebFilterChain;
      import reactor.core.publisher.Mono;
      import reactor.util.context.Context;
      
      @Configuration
      public class LoginFilter implements WebFilter {
      	private final Log logger = LogFactory.getLog(this.getClass());
      
      	@Override
      	public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
      		return exchange.getSession().flatMap(webSession -> {
      			ServerHttpRequest request = exchange.getRequest();
      			String authToken = null;
      
      			MultiValueMap<String, HttpCookie> cookies = request.getCookies();
      			authToken = String.valueOf(cookies.get("TOKEN"));
      
      			String token_session = (String) webSession.getAttribute("TOKEN");
      
      			// 测试免登陆
      			if ("TOKEN".equals(authToken)) {
      				return chain.filter(exchange);
      			}
      
      			if (ValidatorUtil.isNull(authToken)) {
      				return Mono.error(new Exception("TOKEN为空,请重新登陆!"));
      			} else {
      				webSession.getAttributes().put("userInfo", userInfo);
      				webSession.getAttributes().put("TOKEN", authToken);
      				return chain.filter(exchange)
      						.contextWrite(Context.of("userInfo", userInfo));
      			}
      		});
      	}
      }
      <!--code4-->
      
      
  2. 链路追踪(仅仅是Traceid的实现)

    • 由于响应式编程的影响ThredLocal不能使用MDC也没有办法正常使用,Spring Sleuth也不再继续维护,所以我们需要换到micrometer

    • pom中添加依赖

      <dependency>
      <groupId>io.micrometer</groupId>
      <artifactId>micrometer-tracing</artifactId>
      </dependency>
      <dependency>
      <groupId>io.micrometer</groupId>
      <artifactId>micrometer-observation</artifactId>
      </dependency>
      <dependency>
      <groupId>io.micrometer</groupId>
      <artifactId>micrometer-registry-prometheus</artifactId>
      </dependency>
      <dependency>
      <groupId>io.micrometer</groupId>
      <artifactId>micrometer-tracing-bridge-otel</artifactId>
      </dependency>
      <dependency>
      <groupId>io.micrometer</groupId>
      <artifactId>context-propagation</artifactId>
      </dependency>
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-aop</artifactId>
      </dependency>
    • 在Application启动类内添加

      @SpringBootApplication
      @EnableFeignClients
      public class OutputAiApplication {
      public static void main(String[] args) {
      Hooks.enableAutomaticContextPropagation(); // 这段
      SpringApplication.run(OutputAiApplication.class, args);
      }
      }
    • 配置文件中

      management:
      endpoint:
      gateway:
      enabled: true
      endpoints:
      web:
      exposure:
      include: health,gateway,prometheus
      tracing:
      enabled: true
  3. OpenFeign的阻塞

    当使用OpenFeign作为Grpc调用工具时,响应式编程可能会不允许你使用block方法,这是OpenFeign没有对响应式编程做兼容导致的,我们需要

    @Configuration
    public class BlockingLoadBalancerClientConfig {

    @Autowired
    LoadBalancerClientFactory loadBalancerClientFactory;

    @Autowired
    LoadBalancerProperties properties;

    @Bean
    public LoadBalancerClient BlockingLoadBalancerClient() {
    return new CustomBlockingLoadBalancerClient(loadBalancerClientFactory, properties);
    }
    }
    public class CustomBlockingLoadBalancerClient extends BlockingLoadBalancerClient {
    private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;

    public CustomBlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory, LoadBalancerProperties properties) {
    super(loadBalancerClientFactory);
    this.loadBalancerClientFactory = loadBalancerClientFactory;
    }

    @Override
    public <T> ServiceInstance choose(String serviceId, Request<T> request) {
    ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
    if (loadBalancer == null) {
    return null;
    }
    CompletableFuture<Response<ServiceInstance>> f = CompletableFuture.supplyAsync(() -> {
    Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
    return loadBalancerResponse;
    });
    Response<ServiceInstance> loadBalancerResponse = null;


    try {
    loadBalancerResponse = f.get();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (Exception e) {
    e.printStackTrace();
    }
    if (loadBalancerResponse == null) {
    return null;
    }
    return loadBalancerResponse.getServer();
    }
    }

    添加这两个配置就好

延展阅读

  1. 【响应式编程】 - 深度理解线程池新模型Schedulers包
  2. spring cloud gateway 整合micrometer,替代sleuth