/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter.mqtt;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PendingPubAcks {
    private static final Logger LOG = LoggerFactory.getLogger(PendingPubAcks.class);
    private final Map<Integer, PendingPubAck> pendingAcks = new ConcurrentHashMap<Integer, PendingPubAck>();
    private final Vertx vertx;

    public PendingPubAcks(Vertx vertx) {
        this.vertx = Objects.requireNonNull(vertx);
    }

    public void handlePubAck(Integer msgId) {
        Objects.requireNonNull(msgId);
        Optional.ofNullable(this.pendingAcks.remove(msgId)).ifPresentOrElse(PendingPubAck::onPubAck, () -> LOG.debug("no active request found for received acknowledgement [packet-id: {}]", (Object)msgId));
    }

    public void add(Integer msgId, Handler<Integer> onAckHandler, Handler<Void> onAckTimeoutHandler, long waitingForAckTimeout) {
        Objects.requireNonNull(msgId);
        Objects.requireNonNull(onAckHandler);
        Objects.requireNonNull(onAckTimeoutHandler);
        PendingPubAck replacedObj = this.pendingAcks.put(msgId, new PendingPubAck(msgId, onAckHandler, onAckTimeoutHandler, this.startTimerIfNeeded(msgId, waitingForAckTimeout)));
        if (replacedObj != null) {
            LOG.error("error registering ack handler; already waiting for ack of message id [{}]", (Object)msgId);
        }
    }

    private Long startTimerIfNeeded(Integer msgId, long waitingForAckTimeout) {
        if (waitingForAckTimeout < 1L) {
            return null;
        }
        return this.vertx.setTimer(waitingForAckTimeout, timerId -> Optional.ofNullable(this.pendingAcks.remove(msgId)).ifPresent(PendingPubAck::onPubAckTimeout));
    }

    private class PendingPubAck {
        private final int msgId;
        private final Handler<Integer> onAckHandler;
        private final Handler<Void> onAckTimeoutHandler;
        private final Long timerId;

        PendingPubAck(int msgId, Handler<Integer> onAckHandler, Handler<Void> onAckTimeoutHandler, Long timerId) {
            this.msgId = msgId;
            this.onAckHandler = Objects.requireNonNull(onAckHandler);
            this.onAckTimeoutHandler = Objects.requireNonNull(onAckTimeoutHandler);
            this.timerId = timerId;
        }

        public void onPubAck() {
            LOG.trace("acknowledgement received for message sent to device [packet-id: {}]", (Object)this.msgId);
            if (this.timerId != null) {
                PendingPubAcks.this.vertx.cancelTimer(this.timerId.longValue());
            }
            this.onAckHandler.handle((Object)this.msgId);
        }

        public void onPubAckTimeout() {
            this.onAckTimeoutHandler.handle(null);
        }
    }
}

