/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.vertx;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.ExternalEngine;
import io.confluent.parallelconsumer.internal.UserFunctions;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.vertx.VertxParallelStreamProcessor;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

public class VertxParallelEoSStreamProcessor<K, V>
extends ExternalEngine<K, V>
implements VertxParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(VertxParallelEoSStreamProcessor.class);
    private static final String VERTX_TYPE = "vert.x-type";
    private final Vertx vertx;
    private final WebClient webClient;
    private Optional<Runnable> onVertxCompleteHook = Optional.empty();

    public VertxParallelEoSStreamProcessor(ParallelConsumerOptions options) {
        this(Vertx.vertx(), null, options);
    }

    public VertxParallelEoSStreamProcessor(Vertx vertx, WebClient webClient, ParallelConsumerOptions options) {
        super(options);
        int cores = Runtime.getRuntime().availableProcessors();
        VertxOptions vertxOptions = new VertxOptions().setWorkerPoolSize(cores);
        int maxConcurrency = options.getMaxConcurrency();
        WebClientOptions webClientOptions = new WebClientOptions().setMaxPoolSize(maxConcurrency).setHttp2MaxPoolSize(maxConcurrency);
        if (vertx == null) {
            vertx = Vertx.vertx((VertxOptions)vertxOptions);
        }
        this.vertx = vertx;
        if (webClient == null) {
            webClient = WebClient.create((Vertx)vertx, (WebClientOptions)webClientOptions);
        }
        this.webClient = webClient;
    }

    protected ThreadPoolExecutor setupWorkerPool(int poolSize) {
        return super.setupWorkerPool(1);
    }

    @Override
    public void vertxHttpReqInfo(Function<PollContext<K, V>, RequestInfo> requestInfoFunction, Consumer<Future<HttpResponse<Buffer>>> onSend, Consumer<AsyncResult<HttpResponse<Buffer>>> onWebRequestComplete) {
        this.vertxHttpRequest((webClient, rec) -> {
            RequestInfo reqInf = (RequestInfo)UserFunctions.carefullyRun((Function)requestInfoFunction, (Object)rec);
            HttpRequest req = webClient.get(reqInf.getPort(), reqInf.getHost(), reqInf.getContextPath());
            Map<String, String> params = reqInf.getParams();
            for (Map.Entry<String, String> entry : params.entrySet()) {
                req = req.addQueryParam(entry.getKey(), entry.getValue());
            }
            return req;
        }, onSend, onWebRequestComplete);
    }

    @Override
    public void vertxHttpRequest(BiFunction<WebClient, PollContext<K, V>, HttpRequest<Buffer>> webClientRequestFunction, Consumer<Future<HttpResponse<Buffer>>> onSend, Consumer<AsyncResult<HttpResponse<Buffer>>> onWebRequestComplete) {
        this.vertxHttpWebClient((webClient, record) -> {
            HttpRequest call = (HttpRequest)UserFunctions.carefullyRun((BiFunction)webClientRequestFunction, (Object)webClient, (Object)record);
            Future send = call.send();
            send.onComplete(ar -> onWebRequestComplete.accept((AsyncResult<HttpResponse<Buffer>>)ar));
            return send;
        }, onSend);
    }

    @Override
    public void vertxHttpWebClient(BiFunction<WebClient, PollContext<K, V>, Future<HttpResponse<Buffer>>> webClientRequestFunction, Consumer<Future<HttpResponse<Buffer>>> onWebRequestSentCallback) {
        Function<PollContextInternal, List> userFuncWrapper = context -> {
            log.trace("Consumed a record ({}), executing void function...", context);
            Future futureWebResponse = (Future)UserFunctions.carefullyRun((BiFunction)webClientRequestFunction, (Object)this.webClient, (Object)context.getPollContext());
            onWebRequestSentCallback.accept(futureWebResponse);
            this.addVertxHooks((PollContextInternal<K, V>)context, (Future<?>)futureWebResponse);
            return UniLists.of((Object)futureWebResponse);
        };
        Consumer<Future> noOp = ignore -> {};
        super.supervisorLoop(userFuncWrapper, noOp);
    }

    private void addVertxHooks(PollContextInternal<K, V> context, Future<?> send) {
        context.streamWorkContainers().forEach(wc -> {
            wc.setWorkType(VERTX_TYPE);
            send.onSuccess(h -> {
                log.debug("Vert.x Vertical success");
                wc.onUserFunctionSuccess();
                this.addToMailbox((WorkContainer)wc);
            });
            send.onFailure(h -> {
                log.error("Vert.x Vertical fail: {}", (Object)h.getMessage());
                wc.onUserFunctionFailure(h);
                this.addToMailbox((WorkContainer)wc);
            });
            send.onComplete(ar -> {
                log.trace("Running plugin hook");
                this.onVertxCompleteHook.ifPresent(Runnable::run);
            });
        });
    }

    @Override
    public void vertxFuture(Function<PollContext<K, V>, Future<?>> result) {
        Function<PollContextInternal, List> userFuncWrapper = context -> {
            log.trace("Consumed a record ({}), executing void function...", context);
            Future send = (Future)UserFunctions.carefullyRun((Function)result, (Object)context.getPollContext());
            this.addVertxHooks((PollContextInternal<K, V>)context, (Future<?>)send);
            return UniLists.of((Object)send);
        };
        Consumer<Future> noOp = ignore -> {};
        super.supervisorLoop(userFuncWrapper, noOp);
    }

    @Override
    public void batchVertxFuture(Function<PollContext<K, V>, Future<?>> result) {
        Function<PollContextInternal, List> userFuncWrapper = context -> {
            Future send = (Future)UserFunctions.carefullyRun((Function)result, (Object)context.getPollContext());
            this.addVertxHooks((PollContextInternal<K, V>)context, (Future<?>)send);
            return UniLists.of((Object)send);
        };
        Consumer<Future> noOp = ignore -> {};
        super.supervisorLoop(userFuncWrapper, noOp);
    }

    public void addVertxOnCompleteHook(Runnable hookFunc) {
        this.onVertxCompleteHook = Optional.of(hookFunc);
    }

    protected void onUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        if (this.isAsyncFutureWork(resultsFromUserFunction)) {
            log.debug("Vertx creation function success, user's function success");
        } else {
            super.onUserFunctionSuccess(wc, resultsFromUserFunction);
        }
    }

    protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        if (this.isAsyncFutureWork(resultsFromUserFunction)) {
            log.debug("User function success but not adding vertx vertical to mailbox yet");
        } else {
            super.addToMailBoxOnUserFunctionSuccess(wc, resultsFromUserFunction);
        }
    }

    protected boolean isAsyncFutureWork(List<?> resultsFromUserFunction) {
        Iterator<?> iterator = resultsFromUserFunction.iterator();
        if (iterator.hasNext()) {
            Object object = iterator.next();
            return object instanceof Future;
        }
        return false;
    }

    public void close(Duration timeout, DrainingCloseable.DrainingMode drainMode) {
        log.info("Vert.x async consumer closing...");
        super.close(timeout, drainMode);
        this.webClient.close();
        Future close = this.vertx.close();
        Timer timer = Time.SYSTEM.timer(timeout);
        while (!close.isComplete()) {
            log.trace("Waiting on close to complete");
            Thread.sleep(100L);
            timer.update();
            if (!timer.isExpired()) continue;
            throw new TimeoutException("Waiting for system to close");
        }
    }

    public static class RequestInfo {
        public static final int DEFAULT_PORT = 8080;
        private final String host;
        private final int port;
        private final String contextPath;
        private Map<String, String> params;

        public RequestInfo(String host, String contextPath, Map<String, String> params) {
            this(host, 8080, contextPath, params);
        }

        public RequestInfo(String host, String contextPath) {
            this(host, 8080, contextPath, UniMaps.of());
        }

        public void setParams(Map<String, String> params) {
            this.params = params;
        }

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }

        public String getContextPath() {
            return this.contextPath;
        }

        public Map<String, String> getParams() {
            return this.params;
        }

        public RequestInfo(String host, int port, String contextPath, Map<String, String> params) {
            this.host = host;
            this.port = port;
            this.contextPath = contextPath;
            this.params = params;
        }
    }
}

