/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.event.EventStream;
import akka.http.javadsl.model.ws.PeerClosedConnectionException;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.actor.AbstractActorSubscriber;
import akka.stream.actor.ActorSubscriberMessage;
import akka.stream.actor.MaxInFlightRequestStrategy;
import akka.stream.actor.RequestStrategy;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.model.base.exceptions.DittoJsonException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.services.gateway.streaming.ResponsePublished;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;

public final class CommandSubscriber
extends AbstractActorSubscriber {
    private final DiagnosticLoggingAdapter logger = LogUtil.obtain((Actor)this);
    private final ActorRef delegateActor;
    private final int backpressureQueueSize;
    private final List<String> outstandingCommandCorrelationIds = new ArrayList<String>();

    private CommandSubscriber(ActorRef delegateActor, int backpressureQueueSize, EventStream eventStream) {
        this.delegateActor = delegateActor;
        this.backpressureQueueSize = backpressureQueueSize;
        eventStream.subscribe(this.getSelf(), ResponsePublished.class);
    }

    public static Props props(ActorRef delegateActor, int backpressureQueueSize, EventStream eventStream) {
        return Props.create(CommandSubscriber.class, (Object[])new Object[]{delegateActor, backpressureQueueSize, eventStream});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ActorSubscriberMessage.OnNext.class, on -> on.element() instanceof Signal, onNext -> {
            Signal signal = (Signal)onNext.element();
            Optional correlationIdOpt = signal.getDittoHeaders().getCorrelationId();
            if (correlationIdOpt.isPresent()) {
                String correlationId = (String)correlationIdOpt.get();
                LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)correlationId, (LogUtil.MdcField[])new LogUtil.MdcField[0]);
                if (this.isResponseExpected(signal)) {
                    this.outstandingCommandCorrelationIds.add(correlationId);
                    if (this.outstandingCommandCorrelationIds.size() > this.backpressureQueueSize) {
                        throw new IllegalStateException("queued too many: " + this.outstandingCommandCorrelationIds.size() + " - backpressureQueueSize is: " + this.backpressureQueueSize);
                    }
                }
                this.logger.debug("Got new Signal <{}>, currently outstanding are <{}>", (Object)signal.getType(), (Object)this.outstandingCommandCorrelationIds.size());
                this.delegateActor.tell((Object)signal, this.getSelf());
            } else {
                this.logger.warning("Got a Signal <{}> without correlationId, NOT accepting/forwarding it: {}", (Object)signal.getType(), (Object)signal);
            }
        }).match(ResponsePublished.class, responded -> this.outstandingCommandCorrelationIds.remove(responded.getCorrelationId())).match(DittoRuntimeException.class, cre -> this.handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException)cre)).match(RuntimeException.class, jre -> this.handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException)new DittoJsonException(jre))).match(ActorSubscriberMessage.OnNext.class, onComplete -> this.logger.warning("Got unknown element in 'OnNext'")).matchEquals((Object)ActorSubscriberMessage.onCompleteInstance(), onComplete -> {
            this.logger.info("Stream completed, stopping myself..");
            this.getContext().stop(this.getSelf());
        }).match(ActorSubscriberMessage.OnError.class, onError -> {
            Throwable cause = onError.cause();
            if (cause instanceof PeerClosedConnectionException) {
                this.logger.debug("Received PeerClosedConnectionException with close code <{}> and close reason <{}>.", (Object)((PeerClosedConnectionException)cause).closeCode(), (Object)((PeerClosedConnectionException)cause).closeReason());
                this.getContext().stop(this.getSelf());
            } else if (cause instanceof DittoRuntimeException) {
                this.handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException)cause);
            } else if (cause instanceof JsonRuntimeException) {
                this.handleDittoRuntimeException(this.delegateActor, (DittoRuntimeException)new DittoJsonException((RuntimeException)cause));
            } else if (cause instanceof RuntimeException) {
                this.logger.error(cause, "Unexpected RuntimeException <{}>: ", (Object)cause.getClass().getSimpleName(), (Object)cause.getMessage());
                this.handleDittoRuntimeException(this.delegateActor, GatewayInternalErrorException.newBuilder().cause(cause).build());
            } else {
                this.logger.warning("Got 'OnError': {} {}", (Object)cause.getClass().getName(), (Object)cause.getMessage());
            }
        }).matchAny(any -> this.logger.warning("Got unknown message '{}'", any)).build();
    }

    private boolean isResponseExpected(Signal<?> signal) {
        return signal instanceof Command && signal.getDittoHeaders().isResponseRequired();
    }

    private void handleDittoRuntimeException(ActorRef delegateActor, DittoRuntimeException cre) {
        LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (Optional)cre.getDittoHeaders().getCorrelationId(), (LogUtil.MdcField[])new LogUtil.MdcField[0]);
        this.logger.info("Got 'DittoRuntimeException': {} {}", (Object)cre.getClass().getName(), (Object)cre.getMessage());
        cre.getDittoHeaders().getCorrelationId().ifPresent(this.outstandingCommandCorrelationIds::remove);
        if (cre.getDittoHeaders().isResponseRequired()) {
            delegateActor.forward((Object)cre, (ActorContext)this.getContext());
        } else {
            this.logger.debug("Requester did not require response (via DittoHeader '{}') - not sending one", (Object)DittoHeaderDefinition.RESPONSE_REQUIRED);
        }
    }

    public RequestStrategy requestStrategy() {
        return new MaxInFlightRequestStrategy(this.backpressureQueueSize){

            public int inFlightInternally() {
                return CommandSubscriber.this.outstandingCommandCorrelationIds.size();
            }
        };
    }
}

