/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.vertx;

import io.confluent.parallelconsumer.DrainingCloseable;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.UserFunctions;
import io.confluent.parallelconsumer.WorkContainer;
import io.confluent.parallelconsumer.vertx.VertxParallelStreamProcessor;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

public class VertxParallelEoSStreamProcessor<K, V>
extends ParallelEoSStreamProcessor<K, V>
implements VertxParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(VertxParallelEoSStreamProcessor.class);
    private static final String VERTX_TYPE = "vert.x-type";
    private final Vertx vertx;
    private final WebClient webClient;
    private Optional<Runnable> onVertxCompleteHook = Optional.empty();

    public VertxParallelEoSStreamProcessor(Consumer<K, V> consumer, Producer<K, V> producer, ParallelConsumerOptions options) {
        this(Vertx.vertx(), null, options);
    }

    public VertxParallelEoSStreamProcessor(Vertx vertx, WebClient webClient, ParallelConsumerOptions options) {
        super(options);
        if (vertx == null) {
            vertx = Vertx.vertx();
        }
        this.vertx = vertx;
        if (webClient == null) {
            webClient = WebClient.create((Vertx)vertx);
        }
        this.webClient = webClient;
    }

    @Override
    public void vertxHttpReqInfo(Function<ConsumerRecord<K, V>, RequestInfo> requestInfoFunction, java.util.function.Consumer<Future<HttpResponse<Buffer>>> onSend, java.util.function.Consumer<AsyncResult<HttpResponse<Buffer>>> onWebRequestComplete) {
        this.vertxHttpRequest((wc, rec) -> {
            RequestInfo reqInf = (RequestInfo)UserFunctions.carefullyRun((Function)requestInfoFunction, (Object)rec);
            HttpRequest req = wc.get(reqInf.getPort(), reqInf.getHost(), reqInf.getContextPath());
            Map<String, String> params = reqInf.getParams();
            for (Map.Entry<String, String> entry : params.entrySet()) {
                req = req.addQueryParam(entry.getKey(), entry.getValue());
            }
            return req;
        }, onSend, onWebRequestComplete);
    }

    @Override
    public void vertxHttpRequest(BiFunction<WebClient, ConsumerRecord<K, V>, HttpRequest<Buffer>> webClientRequestFunction, java.util.function.Consumer<Future<HttpResponse<Buffer>>> onSend, java.util.function.Consumer<AsyncResult<HttpResponse<Buffer>>> onWebRequestComplete) {
        this.vertxHttpWebClient((webClient, record) -> {
            HttpRequest call = (HttpRequest)UserFunctions.carefullyRun((BiFunction)webClientRequestFunction, (Object)webClient, (Object)record);
            Future send = call.send();
            send.onComplete(ar -> onWebRequestComplete.accept((AsyncResult<HttpResponse<Buffer>>)ar));
            return send;
        }, onSend);
    }

    @Override
    public void vertxHttpWebClient(BiFunction<WebClient, ConsumerRecord<K, V>, Future<HttpResponse<Buffer>>> webClientRequestFunction, java.util.function.Consumer<Future<HttpResponse<Buffer>>> onSend) {
        Function<ConsumerRecord, List> userFuncWrapper = record -> {
            Future send = (Future)UserFunctions.carefullyRun((BiFunction)webClientRequestFunction, (Object)this.webClient, (Object)record);
            onSend.accept(send);
            WorkContainer wc = this.wm.getWorkContainerForRecord(record);
            wc.setWorkType(VERTX_TYPE);
            send.onSuccess(h -> {
                log.debug("Vert.x Vertical success");
                log.trace("Response body: {}", (Object)h.bodyAsString());
                wc.onUserFunctionSuccess();
                this.addToMailbox(wc);
            });
            send.onFailure(h -> {
                log.error("Vert.x Vertical fail: {}", (Object)h.getMessage());
                wc.onUserFunctionFailure();
                this.addToMailbox(wc);
            });
            send.onComplete(ar -> {
                log.trace("Running plugin hook");
                this.onVertxCompleteHook.ifPresent(Runnable::run);
            });
            return UniLists.of((Object)send);
        };
        java.util.function.Consumer<Future> noOp = ignore -> {};
        super.supervisorLoop(userFuncWrapper, noOp);
    }

    public void addVertxOnCompleteHook(Runnable hookFunc) {
        this.onVertxCompleteHook = Optional.of(hookFunc);
    }

    protected void onUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        if (this.isVertxWork(resultsFromUserFunction)) {
            log.debug("Vertx creation function success, user's function success");
        } else {
            super.onUserFunctionSuccess(wc, resultsFromUserFunction);
        }
    }

    protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        if (this.isVertxWork(resultsFromUserFunction)) {
            log.debug("User function success but not adding vertx vertical to mailbox yet");
        } else {
            super.addToMailBoxOnUserFunctionSuccess(wc, resultsFromUserFunction);
        }
    }

    private boolean isVertxWork(List<?> resultsFromUserFunction) {
        Iterator<?> iterator = resultsFromUserFunction.iterator();
        if (iterator.hasNext()) {
            Object object = iterator.next();
            return object instanceof Future;
        }
        return false;
    }

    public void close(Duration timeout, DrainingCloseable.DrainingMode drainMode) {
        log.info("Vert.x async consumer closing...");
        super.close(timeout, drainMode);
        this.webClient.close();
        Future close = this.vertx.close();
        Timer timer = Time.SYSTEM.timer(timeout);
        while (!close.isComplete()) {
            log.trace("Waiting on close to complete");
            Thread.sleep(100L);
            timer.update();
            if (!timer.isExpired()) continue;
            throw new TimeoutException("Waiting for system to close");
        }
    }

    public static class RequestInfo {
        public static final int DEFAULT_PORT = 8080;
        private final String host;
        private final int port;
        private final String contextPath;
        private Map<String, String> params;

        public RequestInfo(String host, String contextPath, Map<String, String> params) {
            this(host, 8080, contextPath, params);
        }

        public RequestInfo(String host, String contextPath) {
            this(host, 8080, contextPath, UniMaps.of());
        }

        public void setParams(Map<String, String> params) {
            this.params = params;
        }

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }

        public String getContextPath() {
            return this.contextPath;
        }

        public Map<String, String> getParams() {
            return this.params;
        }

        public RequestInfo(String host, int port, String contextPath, Map<String, String> params) {
            this.host = host;
            this.port = port;
            this.contextPath = contextPath;
            this.params = params;
        }
    }
}

