WebFlux踩坑实录
WebFlux踩坑实录
从WebMVC升级到WebFlux之后有一些东西都不能按照之前的思维来做
如何从MVC到Flux
去除
<dependency> |
添加
<dependency> |
转到Flux之后你需要知道的事情
-
响应式编程,需要一个订阅,你的流才能流动。
-
Flux响应式编程,可以理解为全走了异步线程池,所以你的代码需要返回一个Mono或者Flux。
-
什么是Mono,什么是Flux。Mono是返回0-1个元素的数据,Flux是会返回0-N个元素的数据。
-
TherdLocal不能使用,需要使用到Context上下文来进行传递参数。
- 可以使用
Mono.deferContextual(ctx -> Mono.just(ctx.get("")));
来获取到你存在上下文里面的值 - 可以使用
Mono.just(需要处理的内容).contextWrite(Context.of("key", val))
来设置上下文
- 可以使用
-
不存在拦截器,只有过滤器WebFliter
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain)
// ServerWebExchange包含请求和响应
// WebFilterChain是请求链
// 可以使用 exchange.getSession() 来获取session
// 可以使用 chain.filter(exchange) 继续执行 -
全局异常处理器和传统的MVC使用形式相同
-
block是将Mono
变为T的函数,但是在某些线程内不允许我们这样做,这样是阻塞式编程,你可以使用 Mono.fromCallable(() -> 需要阻塞的方法
.contextWrite().block()) // 可选,写入上下文,对应4
.subscribeOn(Schedulers.boundedElastic()).block();对于同步阻塞的调用,如果直接调用会阻塞主线程,而在响应式编程中,主线程数量往往等同于cpu核数,一旦阻塞,就会产生问题导致吞吐量极速下降。
为此,可以只用上面的方法,把一个同步阻塞的代码(比如一次http请求)封装成一个Mono, 并使用subscribeOn
方法用Schedulers.boundedElastic()
线程池来实现异步效果。
踩坑点
-
拦截器的实现
-
因为2.5,我们知道不存在拦截器,只有过滤器WebFliter,所以登陆鉴权的功能实现需要在这里进行
-
实例代码
-
import com.baiwang.moirai.utils.ValidatorUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpCookie; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.util.MultiValueMap; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; import reactor.util.context.Context; @Configuration public class LoginFilter implements WebFilter { private final Log logger = LogFactory.getLog(this.getClass()); @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { return exchange.getSession().flatMap(webSession -> { ServerHttpRequest request = exchange.getRequest(); String authToken = null; MultiValueMap<String, HttpCookie> cookies = request.getCookies(); authToken = String.valueOf(cookies.get("TOKEN")); String token_session = (String) webSession.getAttribute("TOKEN"); // 测试免登陆 if ("TOKEN".equals(authToken)) { return chain.filter(exchange); } if (ValidatorUtil.isNull(authToken)) { return Mono.error(new Exception("TOKEN为空,请重新登陆!")); } else { webSession.getAttributes().put("userInfo", userInfo); webSession.getAttributes().put("TOKEN", authToken); return chain.filter(exchange) .contextWrite(Context.of("userInfo", userInfo)); } }); } } <!--code4-->
-
-
链路追踪(仅仅是Traceid的实现)
-
由于响应式编程的影响ThredLocal不能使用MDC也没有办法正常使用,Spring Sleuth也不再继续维护,所以我们需要换到micrometer
-
pom中添加依赖
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-observation</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency> -
在Application启动类内添加
public class OutputAiApplication {
public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation(); // 这段
SpringApplication.run(OutputAiApplication.class, args);
}
} -
配置文件中
management:
endpoint:
gateway:
enabled: true
endpoints:
web:
exposure:
include: health,gateway,prometheus
tracing:
enabled: true
-
-
OpenFeign的阻塞
当使用OpenFeign作为Grpc调用工具时,响应式编程可能会不允许你使用block方法,这是OpenFeign没有对响应式编程做兼容导致的,我们需要
public class BlockingLoadBalancerClientConfig {
LoadBalancerClientFactory loadBalancerClientFactory;
LoadBalancerProperties properties;
public LoadBalancerClient BlockingLoadBalancerClient() {
return new CustomBlockingLoadBalancerClient(loadBalancerClientFactory, properties);
}
}public class CustomBlockingLoadBalancerClient extends BlockingLoadBalancerClient {
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
public CustomBlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory, LoadBalancerProperties properties) {
super(loadBalancerClientFactory);
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
CompletableFuture<Response<ServiceInstance>> f = CompletableFuture.supplyAsync(() -> {
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
return loadBalancerResponse;
});
Response<ServiceInstance> loadBalancerResponse = null;
try {
loadBalancerResponse = f.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
}添加这两个配置就好
延展阅读
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 喵喵博客!