/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.vertx.websocket;

import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocket;
import io.vertx.core.net.impl.ConnectionBase;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration;
import org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VertxWebsocketClientConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(VertxWebsocketClientConsumer.class);

    public VertxWebsocketClientConsumer(Endpoint endpoint, Processor processor) {
        super(endpoint, processor);
    }

    public VertxWebsocketEndpoint getEndpoint() {
        return (VertxWebsocketEndpoint)super.getEndpoint();
    }

    protected void doStart() throws Exception {
        this.configureWebSocketHandlers(this.getEndpoint().getWebSocket());
    }

    protected void configureWebSocketHandlers(WebSocket webSocket) {
        webSocket.binaryMessageHandler(buffer -> this.handleResult(buffer.getBytes()));
        webSocket.textMessageHandler(this::handleResult);
        webSocket.closeHandler(event -> {
            if (this.isStarted()) {
                LOG.info("WebSocket disconnected from {}. Attempting to reconnect...", (Object)webSocket.remoteAddress());
                VertxWebsocketConfiguration configuration = this.getEndpoint().getConfiguration();
                AtomicInteger reconnectAttempts = new AtomicInteger();
                Vertx vertx = this.getEndpoint().getVertx();
                vertx.setPeriodic((long)configuration.getReconnectInitialDelay(), (long)configuration.getReconnectInterval(), timerId -> vertx.executeBlocking(() -> {
                    this.configureWebSocketHandlers(this.getEndpoint().getWebSocket());
                    vertx.cancelTimer(timerId.longValue());
                    return null;
                }, false).onComplete(result -> {
                    if (result.failed()) {
                        Throwable cause = result.cause();
                        if (cause != null) {
                            LOG.debug("WebSocket reconnect to {} failed due to {}", (Object)webSocket.remoteAddress(), (Object)cause);
                        }
                        if (configuration.getMaxReconnectAttempts() > 0 && reconnectAttempts.incrementAndGet() == configuration.getMaxReconnectAttempts()) {
                            LOG.warn("Reconnect max attempts ({}) exhausted. Giving up trying to reconnect to {}", (Object)configuration.getMaxReconnectAttempts(), (Object)webSocket.remoteAddress());
                            vertx.cancelTimer(timerId.longValue());
                        }
                    }
                }));
            }
        });
        webSocket.exceptionHandler(exception -> {
            Throwable cause = exception.getCause();
            if (cause == ConnectionBase.CLOSED_EXCEPTION) {
                return;
            }
            Exchange exchange = this.createExchange(false);
            this.getExceptionHandler().handleException("Error processing exchange", exchange, cause);
            this.releaseExchange(exchange, false);
        });
    }

    protected void handleResult(Object result) {
        Exchange exchange = this.createExchange(false);
        Message message = exchange.getMessage();
        message.setBody(result);
        this.processExchange(exchange);
    }

    protected void processExchange(Exchange exchange) {
        Vertx vertx = this.getEndpoint().getVertx();
        vertx.executeBlocking(() -> {
            this.createUoW(exchange);
            this.getProcessor().process(exchange);
            return null;
        }, false).onComplete(result -> {
            try {
                if (result.failed()) {
                    Throwable cause = result.cause();
                    this.getExceptionHandler().handleException(cause);
                }
            }
            finally {
                this.doneUoW(exchange);
                this.releaseExchange(exchange, false);
            }
        });
    }
}

