Reactor Part 10 Customize a production WebClient

1 Why use WebClient

When first trying to use Spring WebFlux, many people will use Mono.fromFuture() to convert asynchronous requests into Mono objects, or Mono.fromSupplier() to convert requests into MOno objects, both of which are in reactive programming. Not recommended, will block the current thread.

1.1 Mono.fromFuture() VS WebClient

There are the following differences between the Mono.fromFuture() method and using WebClient to call third-party interfaces:

  • Asynchronous vs. non-blocking

The Mono.fromFuture() method is suitable for receiving a java.util.concurrent.Future object and converting it into a reactive Mono. This is a blocking operation because it waits for the Future object to complete. The use of WebClient to call third-party interfaces is asynchronous and non-blocking. It does not directly block the execution of the application, but uses an event-driven approach to process the response.

Scalability and flexibility: Using WebClient allows for more flexible configuration and processing, such as setting timeouts, request headers, retry mechanisms, etc. WebClient can also be integrated with many other Spring WebFlux components such as WebSockets, Server-Sent Events, etc. Mono.fromFuture() is suitable for converting a single Future object into Mono, and has poor scalability.

  • error handling

WebClient provides a richer error handling mechanism, which can handle different HTTP status codes or exceptions through onStatus, onError and other methods. At the same time, WebClient also provides more flexible retry and rollback strategies. The Mono.fromFuture() method can only wrap the result of a Future object in Mono and does not provide a specific error handling mechanism.

  • blocking operation

Mono.fromFuture() blocks. When the Mono.fromFuture() method is called to convert a Future to a Mono, it waits for the result of the Future object to return. During this waiting process, the Mono.fromFuture() method will block the current thread. This means that if the result of the Future is not returned during execution, the current thread will be blocked until the Future object returns a result or times out. Therefore, you need to be aware of the potential risk of blocking when using Mono.fromFuture(). In addition, you need to ensure that Future tasks are executed in a background thread to avoid blocking the main thread of the application.

1.2 Mono.fromFuture VS Mono.fromSupplier

Mono.fromSupplier() and Mono.fromFuture() are both used to convert asynchronously executed operations into responsive Mono objects, but their differences are:

Mono.fromSupplier() applies to a provider/producer and can be used to represent the result of an operation that is some pure computation and non-blocking method. That is, Mono.fromSupplier() asynchronously performs the operation provided by its argument (Supplier) and packages its result into a Mono object.

Mono.fromFuture() works on a java.util.concurrent.Future object and encapsulates it into a Mono object. This means that calling the Mono.fromFuture() method will block the current thread until the asynchronous operation completes and returns a Future object.

Therefore, the main difference between Mono.fromSupplier() and Mono.fromFuture() is:

Mono.fromSupplier() is a non-blocking operation and will not block the current thread. This method is used to execute calculation tasks and return a Mono object that encapsulates the calculation results.
Mono.fromFuture() is a blocking operation, which will block the current thread until the asynchronous operation is completed and returns. It is suitable for processing java.util.concurrent.Future objects.

It should be noted that if the operation provided by Supplier is blocking, the Mono.fromSupplier() method itself will also block the thread. But in general, the operations provided by Supplier are purely computational and do not block threads.

Therefore, you can use the Mono.fromSupplier() method to convert a purely computational operation into a Mono object, and to convert an asynchronously returning result operation into a Mono object, you can use the Mono.fromFuture() method.

2 Customize your own WebClient

2.1 Initialize WebClient

WebClient supports the builder mode, using the WebClient builder mode to support the development of your own personalized WebClient, such as supporting the uniform time-consuming setting of interface calls, customizing the underlying Http client, calling links, printing interface return logs, monitoring interface time-consuming, etc. .

WebClient builder supports the following methods

interface Builder {<!-- -->

/**
* Configure the base url of the request, such as: baseUrl = "https://abc.go.com/v1"; conflicts with uriBuilderFactory, if there is uriBuilderFactory, ignore baseUrl
*/
Builder baseUrl(String baseUrl);

/**
* Default variable for URI requests. Also conflicts with uriBuilderFactory, defaultUriVariables is ignored if there is uriBuilderFactory
*/
Builder defaultUriVariables(Map<String, ?> defaultUriVariables);

/**
* Provide a pre-configured UriBuilderFactory instance
*/
Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory);

/**
* Default header
*/
Builder defaultHeader(String header, String... values);

/**
* Default cookies
*/
Builder defaultCookie(String cookie, String... values);

/**
* Provide a consumer to customize each request
*/
Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest);

/**
* Add a filter, you can add multiple
*/
Builder filter(ExchangeFilterFunction filter);

\t
/**
* Configure the ClientHttpConnector to use. This is useful for options to plug in or customize underlying HTTP client libraries such as SSL.
*/
Builder clientConnector(ClientHttpConnector connector);

/**
* Configure the codecs for the {@code WebClient} in the
* {@link #exchangeStrategies(ExchangeStrategies) underlying}
* {@code ExchangeStrategies}.
* @param configurer the configurer to apply
* @since 5.1.13
*/
Builder codecs(Consumer<ClientCodecConfigurer> configurer);



/**
* Provides an ExchangeFunction pre-configured with ClientHttpConnector and ExchangeStrategies.
This is a replacement for clientConnector and effectively overrides them.
*/
Builder exchangeFunction(ExchangeFunction exchangeFunction);

/**
* Builder the {@link WebClient} instance.
*/
WebClient build();
        
  // Other methods
}

2.2 Log printing and monitoring

  • Print parameters, url, return
  • Parameters and returns need to be converted into json
  • Need to print normal return log and exception
  • Normal monitoring, abnormal monitoring, total monitoring and response time
.doOnSuccess(response->{
    log.info("get.success, url={}, response={}, param={}", url, response);
})
.doOnError(error->{
    log.info("get.error, url={}", url, error);
    // Monitor
})
.doFinally(res->{
  //monitor
})

2.3 Return processing

retrieve() // Declare how to retrieve the response. For example, to extract a ResponseEntity’s state, head and body:

.bodyToMono(clazz) converts the returned body content into a clazz object, and the clazz object can specify its own type. If you encounter a problem that cannot be converted, you can also convert it into a String first, and then implement a tool class yourself to convert the String into a class object.

2.3.1 get

public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {<!-- -->
long start = System.currentTimeMillis();
return webClient.get()
        .uri(url)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {<!-- -->
            log.info("get.success, url={}, response={}, param={}", url, response);
        })
        .doOnError(error-> {<!-- -->
            log.info("get.param.error, url={}", url, error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {<!-- -->
        })
        .publishOn(customScheduler);
}

2.3.2 get param request

public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {<!-- -->
long start = System.currentTimeMillis();
URI uri = UriComponentsBuilder. fromUriString(url)
        .queryParams(param)
        .build()
        .toUri();

return webClient.get()
        .uri(uri)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {<!-- -->
            log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));
        })
        .doOnError(error-> {<!-- -->
            log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {<!-- -->
        // Monitor or print log or time consumption
        })
        .publishOn(customScheduler);
}

2.3.3 post json request

public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {<!-- -->
final long start = System.currentTimeMillis();
return webClient. post()
        .uri(url)
        .contentType(MediaType.APPLICATION_JSON)
        .cookies(cookies -> cookies.setAll(parameter.getCookies()))
        .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))
        .headers(headers -> headers.setAll(parameter.getHeaders()))
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {<!-- -->
            log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());
        })
        .doOnError(error-> {<!-- -->
            log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {<!-- -->
        })
        .publishOn(customScheduler);

}

2.3.4 post form Data request

public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {<!-- -->
    final long start = System.currentTimeMillis();
    return webClient. post()
            .uri(url)
            .contentType(MediaType.APPLICATION_FORM_URLENCODED)
            .cookies(cookies -> cookies.setAll(parameter.getCookies()))
            .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))
            .headers(headers -> headers.setAll(parameter.getMapHeaders()))
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(clazz)
            .doOnSuccess(response-> {<!-- -->
                log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));
            })
            .doOnError(error-> {<!-- -->
                log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);
            })
            .onErrorReturn(defaultClass)
            .doFinally(res-> {<!-- -->
            })
            .publishOn(customScheduler);
}

2.4 Exception handling

2.4.1 Exception return to the bottom line

onErrorReturn returns the complete data when an exception is found.

2.4.2 Exception handling

The status code is converted into an exception and thrown

.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))

Monitoring exception

.doOnError(error -> {<!-- -->
    // log and monitor
})

3 Complete WebClient

package com.geniu.reactor.webclient;

import com.geniu.utils.JsonUtil;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpClient;

import java.net.URI;
import java.time.Duration;
import java.util.function.Function;

/**
 * @Author: prepared
 * @Date: 2023/8/15 11:05
 */
@Slf4j
public class CustomerWebClient {

public static final CustomerWebClient instance = new CustomerWebClient();

/**
* Limit concurrency to 100
*/
Scheduler customScheduler = Schedulers.newParallel("CustomScheduler", 100);


private final WebClient webClient;

private CustomerWebClient() {

final SslContextBuilder sslBuilder = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE);

final SslProvider ssl = SslProvider.builder().sslContext(sslBuilder)
.defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build();

final int cpuCores = Runtime.getRuntime().availableProcessors();
final int selectorCount = Math.max(cpuCores / 2, 4);
final int workerCount = Math.max(cpuCores * 2, 8);
final LoopResources pool = LoopResources.create("HCofSWC", selectorCount, workerCount, true);

final Function tcpMapper = tcp -> tcp
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_TIMEOUT, 10000)
.secure(ssl)
.runOn(pool);

ConnectionProvider.Builder httpClientOfSWC = ConnectionProvider
.builder("HttpClientOfSWC")
.maxConnections(100_000)
.pendingAcquireTimeout(Duration.ofSeconds(6));
final ConnectionProvider connectionProvider = httpClientOfSWC.build();

final HttpClient hc = HttpClient.create(connectionProvider)
.tcpConfiguration(tcpMapper);

final Function hcMapper = rhc -> rhc
.compress(true);

final WebClient.Builder wcb = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc)));
// .filter(new TraceRequestFilter()); Trace tracking can be added through Filter

this.webClient = wcb.build();
}

public  Mono get(String url, Class clazz, T defaultClass) {
long start = System.currentTimeMillis();
return webClient. get()
.uri(url)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))
.bodyToMono(clazz)
.doOnSuccess(response-> {
log.info("get.success, url={}, response={}, param={}", url, response);
})
.doOnError(error-> {
log.info("get.param.error, url={}", url, error);
})
.onErrorReturn(defaultClass)
.doFinally(res-> {
})
.publishOn(customScheduler);
}

public  Mono getParam(String url, MultiValueMap param, Class clazz, T defaultClass) {
long start = System.currentTimeMillis();
URI uri = UriComponentsBuilder.fromUriString(url)
.queryParams(param)
.build()
.toUri();

return webClient. get()
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(clazz)
.doOnSuccess(response-> {
log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));
})
.doOnError(error-> {
log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);
})
.onErrorReturn(defaultClass)
.doFinally(res-> {
})
.publishOn(customScheduler);
}



public  Mono postJson(String url, final HttpParameter4Json parameter, Class clazz, T defaultClass) {
final long start = System.currentTimeMillis();
return webClient.post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON)
.cookies(cookies -> cookies.setAll(parameter.getCookies()))
.body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))
.headers(headers -> headers.setAll(parameter.getHeaders()))
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(clazz)
.doOnSuccess(response-> {
log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());
})
.doOnError(error-> {
log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);
})
.onErrorReturn(defaultClass)
.doFinally(res-> {
})
.publishOn(customScheduler);

}


public  Mono postFormData(String url, HttpParameter parameter, Class clazz, T defaultClass) {
final long start = System.currentTimeMillis();
return webClient.post()
.uri(url)
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.cookies(cookies -> cookies.setAll(parameter.getCookies()))
.body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))
.headers(headers -> headers.setAll(parameter.getMapHeaders()))
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(clazz)
.doOnSuccess(response-> {
log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));
})
.doOnError(error-> {
log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);
})
.onErrorReturn(defaultClass)
.doFinally(res-> {
})
.publishOn(customScheduler);
}

}