Skip to content

Added support for streaming response modification #3477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter.factory.rewrite;

import java.util.function.BiFunction;

import reactor.core.publisher.Flux;

import org.springframework.web.server.ServerWebExchange;

/**
* This interface is BETA and may be subject to change in a future release.
*
* @param <T> the type of the first argument to the function
* @param <R> the type of element signaled by the {@link Flux}
*/
public interface FluxRewriteFunction<T, R> extends BiFunction<ServerWebExchange, Flux<T>, Flux<R>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,19 @@ public static class Config {

private String newContentType;

/**
* Deprecated in favour of {@link FluxRewriteFunction} &
* {@link MonoRewriteFunction} Use {@link MonoRewriteFunction} for modifying
* non-streaming response body Use {@link FluxRewriteFunction} for modifying
* streaming response body
*/
@Deprecated
private RewriteFunction rewriteFunction;

private FluxRewriteFunction fluxRewriteFunction;

private MonoRewriteFunction monoRewriteFunction;

public Class getInClass() {
return inClass;
}
Expand Down Expand Up @@ -138,15 +149,42 @@ public Config setNewContentType(String newContentType) {
return this;
}

@Deprecated
public RewriteFunction getRewriteFunction() {
return rewriteFunction;
}

public <T, R> MonoRewriteFunction<Mono<T>, Mono<R>> getMonoRewriteFunction() {
return monoRewriteFunction;
}

public <T, R> FluxRewriteFunction<Flux<T>, Flux<R>> getFluxRewriteFunction() {
return fluxRewriteFunction;
}

/**
* Deprecated in favour of {@link Config#setMonoRewriteFunction} &
* {@link Config#setFluxRewriteFunction} Use {@link Config#setMonoRewriteFunction}
* for modifying non-streaming response body Use
* {@link Config#setFluxRewriteFunction} for modifying streaming response body
*/
@Deprecated
public Config setRewriteFunction(RewriteFunction rewriteFunction) {
this.rewriteFunction = rewriteFunction;
return this;
}

public Config setMonoRewriteFunction(MonoRewriteFunction monoRewriteFunction) {
this.monoRewriteFunction = monoRewriteFunction;
return this;
}

public Config setFluxRewriteFunction(FluxRewriteFunction fluxRewriteFunction) {
this.fluxRewriteFunction = fluxRewriteFunction;
return this;
}

@Deprecated
public <T, R> Config setRewriteFunction(Class<T> inClass, Class<R> outClass,
RewriteFunction<T, R> rewriteFunction) {
setInClass(inClass);
Expand All @@ -155,6 +193,21 @@ public <T, R> Config setRewriteFunction(Class<T> inClass, Class<R> outClass,
return this;
}

public <T, R> Config setFluxRewriteFunction(Class<T> inClass, Class<R> outClass,
FluxRewriteFunction<T, R> fluxRewriteFunction) {
setInClass(inClass);
setOutClass(outClass);
setFluxRewriteFunction(fluxRewriteFunction);
return this;
}

public <T, R> Config setMonoRewriteFunction(Class<T> inClass, Class<R> outClass,
MonoRewriteFunction<T, R> monoRewriteFunction) {
setInClass(inClass);
setOutClass(outClass);
setMonoRewriteFunction(monoRewriteFunction);
return this;
}
}

public class ModifyResponseGatewayFilter implements GatewayFilter, Ordered {
Expand Down Expand Up @@ -204,51 +257,99 @@ public ModifiedServerHttpResponse(ServerWebExchange exchange, Config config) {
this.config = config;
}

@SuppressWarnings("unchecked")
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {

Class inClass = config.getInClass();
Class outClass = config.getOutClass();

String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
HttpHeaders httpHeaders = new HttpHeaders();
// explicitly add it in this way instead of
// 'httpHeaders.setContentType(originalResponseContentType)'
// this will prevent exception in case of using non-standard media
// types like "Content-Type: image"
httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);

HttpHeaders httpHeaders = prepareHttpHeaders();
ClientResponse clientResponse = prepareClientResponse(body, httpHeaders);

// TODO: flux or mono
Mono modifiedBody = extractBody(exchange, clientResponse, inClass)
.flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody))
.switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction().apply(exchange, null)));
var modifiedBody = extractBody(exchange, clientResponse, inClass);
if (config.getRewriteFunction() != null) {
// TODO: to be removed with removal of rewriteFunction
modifiedBody = modifiedBody
.flatMap(originalBody -> config.getRewriteFunction()
.apply(exchange, originalBody))
.switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction()
.apply(exchange, null)));
}
if (config.getMonoRewriteFunction() != null) {
modifiedBody = config.getMonoRewriteFunction().apply(exchange,
modifiedBody);
}

BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, outClass);
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody,
outClass);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
exchange.getResponse().getHeaders());
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
Mono<DataBuffer> messageBody = writeBody(getDelegate(), outputMessage, outClass);
HttpHeaders headers = getDelegate().getHeaders();
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
|| headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
}
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
Mono<DataBuffer> messageBody = writeBody(getDelegate(), outputMessage, outClass);
HttpHeaders headers = getDelegate().getHeaders();
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
|| headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
}

if (StringUtils.hasText(config.newContentType)) {
headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType);
}

// TODO: fail if isStreamingMediaType?
return getDelegate().writeWith(messageBody);
}));
}

if (StringUtils.hasText(config.newContentType)) {
headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType);
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Mono<Void> writeAndFlushWith(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
final var httpHeaders = prepareHttpHeaders();
final var fluxRewriteConfig = config.getFluxRewriteFunction();
final var publisher = Flux.from(body).flatMapSequential(r -> r);
final var clientResponse = prepareClientResponse(publisher, httpHeaders);
var modifiedBody = clientResponse.bodyToFlux(config.inClass);
if (config.getRewriteFunction() != null) {
// TODO: to be removed with removal of rewriteFunction
modifiedBody = modifiedBody
.flatMap(originalBody -> config.getRewriteFunction()
.apply(exchange, originalBody))
.switchIfEmpty(Flux.defer(() -> (Flux) config.getRewriteFunction()
.apply(exchange, null)));
}
if (config.getFluxRewriteFunction() != null) {
modifiedBody = fluxRewriteConfig.apply(exchange, modifiedBody);
}
final var bodyInserter = BodyInserters.fromPublisher(modifiedBody,
config.outClass);
final var outputMessage = new CachedBodyOutputMessage(exchange,
exchange.getResponse().getHeaders());

// TODO: fail if isStreamingMediaType?
return getDelegate().writeWith(messageBody);
}));
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
final var messageBody = outputMessage.getBody();
HttpHeaders headers = getDelegate().getHeaders();
if (StringUtils.hasText(config.newContentType)) {
headers.set(HttpHeaders.CONTENT_TYPE, config.newContentType);
}
return getDelegate()
.writeAndFlushWith(messageBody.map(Flux::just));
}));
}

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
private HttpHeaders prepareHttpHeaders() {
String originalResponseContentType = exchange
.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
HttpHeaders httpHeaders = new HttpHeaders();
// explicitly add it in this way instead of
// 'httpHeaders.setContentType(originalResponseContentType)'
// this will prevent exception in case of using non-standard media
// types like "Content-Type: image"
httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);
return httpHeaders;
}

private ClientResponse prepareClientResponse(Publisher<? extends DataBuffer> body, HttpHeaders httpHeaders) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter.factory.rewrite;

import java.util.function.BiFunction;

import reactor.core.publisher.Mono;

import org.springframework.web.server.ServerWebExchange;


/**
* This interface is BETA and may be subject to change in a future release.
*
* @param <T> the type of the first argument to the function
* @param <R> the type of element signaled by the {@link Mono}
*/
public interface MonoRewriteFunction<T, R> extends BiFunction<ServerWebExchange, Mono<T>, Mono<R>> {

}