/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.plugin.httpclient;

import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.timeout.ReadTimeoutException;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.shenyu.common.enums.RetryEnum;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.loadbalancer.cache.UpstreamCacheManager;
import org.apache.shenyu.loadbalancer.entity.Upstream;
import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory;
import org.apache.shenyu.plugin.api.utils.RequestUrlUtils;
import org.apache.shenyu.plugin.httpclient.AbstractHttpClientPlugin;
import org.apache.shenyu.plugin.httpclient.RetryStrategy;
import org.apache.shenyu.plugin.httpclient.exception.ShenyuTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
import org.springframework.web.server.ResponseStatusException;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

public class DefaultRetryStrategy<R>
implements RetryStrategy<R> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultRetryStrategy.class);
    private final AbstractHttpClientPlugin<R> httpClientPlugin;

    public DefaultRetryStrategy(AbstractHttpClientPlugin<R> httpClientPlugin) {
        this.httpClientPlugin = httpClientPlugin;
    }

    @Override
    public Mono<R> execute(Mono<R> clientResponse, ServerWebExchange exchange, Duration duration, int retryTimes) {
        String retryStrategy = (String)Optional.ofNullable(exchange.getAttribute("retryStrategy")).orElseGet(() -> "current");
        if (RetryEnum.CURRENT.getName().equals(retryStrategy)) {
            RetryBackoffSpec retryBackoffSpec = Retry.backoff((long)retryTimes, (Duration)Duration.ofMillis(20L)).maxBackoff(Duration.ofSeconds(20L)).transientErrors(true).jitter(0.5).filter(t -> t instanceof TimeoutException || t instanceof ConnectTimeoutException || t instanceof ReadTimeoutException || t instanceof IllegalStateException).onRetryExhaustedThrow((retryBackoffSpecErr, retrySignal) -> {
                throw new ShenyuTimeoutException("Request timeout, the maximum number of retry times has been exceeded");
            });
            return clientResponse.retryWhen((Retry)retryBackoffSpec).onErrorMap(ShenyuTimeoutException.class, th -> new ResponseStatusException((HttpStatusCode)HttpStatus.REQUEST_TIMEOUT, th.getMessage(), (Throwable)th)).onErrorMap(TimeoutException.class, th -> new ResponseStatusException((HttpStatusCode)HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), (Throwable)th));
        }
        HashSet<URI> exclude = new HashSet<URI>(Collections.singletonList(Objects.requireNonNull((URI)exchange.getAttribute("httpUri"))));
        return this.resend(clientResponse, exchange, duration, exclude, retryTimes).onErrorMap(ShenyuException.class, th -> new ResponseStatusException((HttpStatusCode)HttpStatus.SERVICE_UNAVAILABLE, "CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER", (Throwable)th)).onErrorMap(TimeoutException.class, th -> new ResponseStatusException((HttpStatusCode)HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), (Throwable)th));
    }

    private Mono<R> resend(Mono<R> clientResponse, ServerWebExchange exchange, Duration duration, Set<URI> exclude, int retryTimes) {
        Mono<R> result = clientResponse;
        for (int i = 0; i < retryTimes; ++i) {
            result = this.resend(result, exchange, duration, exclude);
        }
        return result;
    }

    private Mono<R> resend(Mono<R> response, ServerWebExchange exchange, Duration duration, Set<URI> exclude) {
        return response.onErrorResume(th -> {
            String selectorId = (String)exchange.getAttribute("divideSelectorId");
            String loadBalance = (String)exchange.getAttribute("loadBalance");
            List upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selectorId).stream().filter(data -> {
                String trimUri = data.getUrl().trim();
                for (URI needToExclude : exclude) {
                    if (!(needToExclude.getHost() + ":" + needToExclude.getPort()).equals(trimUri)) continue;
                    return false;
                }
                return true;
            }).collect(Collectors.toList());
            if (upstreamList.isEmpty()) {
                return Mono.error((Throwable)new ShenyuException("CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER"));
            }
            String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
            Upstream upstream = LoadBalancerFactory.selector(upstreamList, (String)loadBalance, (String)ip);
            if (Objects.isNull(upstream)) {
                return Mono.error((Throwable)new ShenyuException("CANNOT_FIND_HEALTHY_UPSTREAM_URL_AFTER_FAILOVER"));
            }
            URI newUri = RequestUrlUtils.buildRequestUri((ServerWebExchange)exchange, (String)upstream.buildDomain());
            exclude.add(newUri);
            return this.httpClientPlugin.doRequest(exchange, exchange.getRequest().getMethod().name(), newUri, (Flux<DataBuffer>)exchange.getRequest().getBody()).timeout(duration, Mono.error(() -> new TimeoutException("Response took longer than timeout: " + duration))).doOnError(e -> LOG.error(e.getMessage(), e));
        });
    }
}

