Reading and modifying the request body of SpringCloud Gateway gateway

Reading and modifying the request body of SpringCloud Gateway

Getway needs to operate the body multiple times and needs to cache the body.

Cache body dynamically retrieved multiple times

Create a new top-level filter to cache the body

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

/**
 * @author:zhoumo
 * @descriptions:
 */
@Component
@Slf4j
public class RequestParamGlobalFilter implements GlobalFilter, Ordered {<!-- -->

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {<!-- -->

        /**
         * save request path and serviceId into gateway context
         */
        ServerHttpRequest request = exchange.getRequest();
        HttpHeaders headers = request.getHeaders();

        // handle parameters
        MediaType contentType = headers.getContentType();
        long contentLength = headers.getContentLength();
        if (contentLength > 0) {<!-- -->
                  return readBody(exchange, chain);
        }

        return chain.filter(exchange);
    }


    /**
     * default HttpMessageReader
     */
    private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
    /**
     * ReadJsonBody
     *
     * @param exchange
     * @param chain
     * @return
     */
    private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain) {<!-- -->
        /**
         * join the body
         */
        return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {<!-- -->
            byte[] bytes = new byte[dataBuffer.readableByteCount()];
            dataBuffer.read(bytes);
            DataBufferUtils.release(dataBuffer);
            Flux<DataBuffer> cachedFlux = Flux.defer(() -> {<!-- -->
                DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
                DataBufferUtils.retain(buffer);
                return Mono.just(buffer);
            });
            /**
             * repackage ServerHttpRequest
             */
            ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {<!-- -->
                @Override
                public Flux<DataBuffer> getBody() {<!-- -->
                    return cachedFlux;
                }
            };
            /**
             * mutate exchage with new ServerHttpRequest
             */
            ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
            /**
             * read body string with default messageReaders
             */
            return ServerRequest.create(mutatedExchange, messageReaders).bodyToMono(String.class)
                    .doOnNext(objectValue -> {<!-- -->
                        log.debug("[GatewayContext]Read JsonBody:{}", objectValue);
                    }).then(chain.filter(mutatedExchange));
        });
    }
    @Override
    public int getOrder() {<!-- -->
        return HIGHEST_PRECEDENCE;
    }
}

Get the body at the child node level

AtomicReference<String> requestBody = new AtomicReference<>("");
                RecorderServerHttpRequestDecorator requestDecorator = new RecorderServerHttpRequestDecorator(request);
                Flux<DataBuffer> body = requestDecorator.getBody();
                body.subscribe(buffer -> {<!-- -->
                    CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
                    requestBody.set(charBuffer.toString());
                });

                String body= requestBody.get();

Override the get body method

 public class RecorderServerHttpRequestDecorator extends ServerHttpRequestDecorator {<!-- -->
        private final List<DataBuffer> dataBuffers = new ArrayList<>();
        public RecorderServerHttpRequestDecorator(ServerHttpRequest delegate) {<!-- -->
            super(delegate);
            super.getBody().map(dataBuffer -> {<!-- -->
                dataBuffers.add(dataBuffer);
                return dataBuffer;
            }).subscribe();
        }

        @Override
        public Flux<DataBuffer> getBody() {<!-- -->
            return copy();
        }

        private Flux<DataBuffer> copy() {<!-- -->
            return Flux.fromIterable(dataBuffers)
                    .map(buf -> buf.factory().wrap(buf.asByteBuffer()));
        }
    }

Modify and repackage the body

 String str="" + encodedDecryptedParam;
                DataBuffer bodyDataBuffer = stringBuffer(str);
                Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);
                MediaType contentType = request.getHeaders().getContentType();
                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                        exchange.getRequest()) {<!-- -->
                    @Override
                    public HttpHeaders getHeaders() {<!-- -->
                        HttpHeaders httpHeaders = new HttpHeaders();
                        int length = str.getBytes().length;
                        httpHeaders.putAll(super.getHeaders());
                        httpHeaders.remove(HttpHeaders.CONTENT_TYPE);
                        httpHeaders.remove(HttpHeaders.CONTENT_LENGTH);
                        httpHeaders.setContentLength(length);
                        httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType.toString());
                        //Set CONTENT_TYPE
                        return httpHeaders;
                    }
                    @Override
                    public Flux<DataBuffer> getBody() {<!-- -->
                        return bodyFlux;
                    }
                };
                return chain.filter(exchange.mutate().request(mutatedRequest).build());
 protected DataBuffer stringBuffer(String value) {<!-- -->
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
        DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
        buffer.write(bytes);
        return buffer;
    }

You must add public HttpHeaders getHeaders() to re-encapsulate the header, otherwise the interface layer will be stuck and the request will be infinite.