/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.actor.AbstractActorPublisherWithStash;
import akka.stream.actor.ActorPublisherMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public final class EventAndResponsePublisher
extends AbstractActorPublisherWithStash<Jsonifiable.WithPredicate<JsonObject, JsonField>> {
    private static final int MESSAGE_CONSUMPTION_CHECK_SECONDS = 2;
    private final DiagnosticLoggingAdapter logger = LogUtil.obtain((Actor)this);
    private final int backpressureBufferSize;
    private final List<Jsonifiable.WithPredicate<JsonObject, JsonField>> buffer = new ArrayList<Jsonifiable.WithPredicate<JsonObject, JsonField>>();
    private final AtomicBoolean currentlyInMessageConsumedCheck = new AtomicBoolean(false);
    private String connectionCorrelationId;

    private EventAndResponsePublisher(int backpressureBufferSize) {
        this.backpressureBufferSize = backpressureBufferSize;
    }

    public static Props props(final int backpressureBufferSize) {
        return Props.create(EventAndResponsePublisher.class, (Creator)new Creator<EventAndResponsePublisher>(){
            private static final long serialVersionUID = 1L;

            public EventAndResponsePublisher create() {
                return new EventAndResponsePublisher(backpressureBufferSize);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Connect.class, connect -> {
            String connectionCorrelationId = connect.getConnectionCorrelationId();
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (CharSequence)connectionCorrelationId);
            this.logger.debug("Established new connection: {}", (Object)connectionCorrelationId);
            this.getContext().become(this.connected(connectionCorrelationId));
        }).matchAny(any -> {
            this.logger.info("Got unknown message during init phase '{}' - stashing..", any);
            this.stash();
        }).build();
    }

    private AbstractActor.Receive connected(String connectionCorrelationId) {
        this.connectionCorrelationId = connectionCorrelationId;
        this.unstashAll();
        return ReceiveBuilder.create().match(Signal.class, signal -> this.buffer.size() >= this.backpressureBufferSize, signal -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)signal);
            this.handleBackpressureFor((Jsonifiable.WithPredicate<JsonObject, JsonField>)signal);
        }).match(Signal.class, signal -> {
            if (this.buffer.isEmpty() && this.totalDemand() > 0L) {
                this.onNext(signal);
            } else {
                this.buffer.add((Jsonifiable.WithPredicate<JsonObject, JsonField>)signal);
                this.deliverBuf();
            }
        }).match(DittoRuntimeException.class, cre -> this.buffer.size() >= this.backpressureBufferSize, cre -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (Optional)cre.getDittoHeaders().getCorrelationId());
            this.handleBackpressureFor((Jsonifiable.WithPredicate<JsonObject, JsonField>)cre);
        }).match(DittoRuntimeException.class, cre -> {
            if (this.buffer.isEmpty() && this.totalDemand() > 0L) {
                this.onNext(cre);
            } else {
                this.buffer.add((Jsonifiable.WithPredicate<JsonObject, JsonField>)cre);
                this.deliverBuf();
            }
        }).match(Jsonifiable.WithPredicate.class, signal -> this.buffer.size() >= this.backpressureBufferSize, this::handleBackpressureFor).match(Jsonifiable.WithPredicate.class, jsonifiable -> {
            if (this.buffer.isEmpty() && this.totalDemand() > 0L) {
                this.onNext(jsonifiable);
            } else {
                this.buffer.add((Jsonifiable.WithPredicate<JsonObject, JsonField>)jsonifiable);
                this.deliverBuf();
            }
        }).match(ActorPublisherMessage.Request.class, request -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (CharSequence)connectionCorrelationId);
            this.logger.debug("Got new demand: {}", request);
            this.deliverBuf();
        }).match(ActorPublisherMessage.Cancel.class, cancel -> this.getContext().stop(this.getSelf())).matchAny(any -> {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (CharSequence)connectionCorrelationId);
            this.logger.warning("Got unknown message during connected phase: '{}'", any);
        }).build();
    }

    private void handleBackpressureFor(Jsonifiable.WithPredicate<JsonObject, JsonField> jsonifiable) {
        if (this.currentlyInMessageConsumedCheck.compareAndSet(false, true)) {
            this.logger.warning("Backpressure - buffer of '{}' outstanding Events/CommandResponses is full, dropping '{}'", (Object)this.backpressureBufferSize, jsonifiable);
            long bufSize = this.buffer.size();
            AbstractActor.ActorContext context = this.getContext();
            context.system().scheduler().scheduleOnce(FiniteDuration.apply((long)2L, (TimeUnit)TimeUnit.SECONDS), () -> {
                if (bufSize == (long)this.buffer.size()) {
                    this.logger.warning("Terminating Publisher - did not to consume anything in the last '{}' seconds, buffer is still at '{}' outstanding messages", (Object)2, (Object)bufSize);
                    context.stop(this.getSelf());
                } else {
                    this.currentlyInMessageConsumedCheck.set(false);
                    this.logger.info("Outstanding messages were consumed, Publisher is not terminated");
                }
            }, (ExecutionContext)context.system().dispatcher());
        }
    }

    private void deliverBuf() {
        LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (CharSequence)this.connectionCorrelationId);
        while (this.totalDemand() > 0L) {
            List<Jsonifiable.WithPredicate<JsonObject, JsonField>> took;
            if (this.totalDemand() <= Integer.MAX_VALUE) {
                took = this.buffer.subList(0, Math.min(this.buffer.size(), (int)this.totalDemand()));
                took.forEach(arg_0 -> ((EventAndResponsePublisher)this).onNext(arg_0));
                this.buffer.removeAll(took);
                break;
            }
            took = this.buffer.subList(0, Math.min(this.buffer.size(), Integer.MAX_VALUE));
            took.forEach(arg_0 -> ((EventAndResponsePublisher)this).onNext(arg_0));
            this.buffer.removeAll(took);
        }
    }
}

