/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.services.gateway.streaming.actors;

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import java.util.Optional;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.StopStreaming;
import org.eclipse.ditto.services.gateway.streaming.actors.StreamingSessionActor;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;

public final class StreamingActor
extends AbstractActor {
    public static final String ACTOR_NAME = "streaming";
    private final DiagnosticLoggingAdapter logger = LogUtil.obtain((Actor)this);
    private final ActorRef pubSubMediator;
    private final ActorRef proxyActor;
    private final SupervisorStrategy strategy = new OneForOneStrategy(true, DeciderBuilder.match(Throwable.class, e -> {
        this.logger.error(e, "Escalating above actor!");
        return SupervisorStrategy.escalate();
    }).matchAny(e -> {
        this.logger.error("Unknown message:'{}'! Escalating above actor!", e);
        return SupervisorStrategy.escalate();
    }).build());

    private StreamingActor(ActorRef pubSubMediator, ActorRef proxyActor) {
        this.pubSubMediator = pubSubMediator;
        this.proxyActor = proxyActor;
    }

    public static Props props(final ActorRef pubSubMediator, final ActorRef proxyActor) {
        return Props.create(StreamingActor.class, (Creator)new Creator<StreamingActor>(){
            private static final long serialVersionUID = 1L;

            public StreamingActor create() throws Exception {
                return new StreamingActor(pubSubMediator, proxyActor);
            }
        });
    }

    public SupervisorStrategy supervisorStrategy() {
        return this.strategy;
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Connect.class, connect -> {
            ActorRef eventAndResponsePublisher = connect.getEventAndResponsePublisher();
            eventAndResponsePublisher.forward(connect, (ActorContext)this.getContext());
            String connectionCorrelationId = connect.getConnectionCorrelationId();
            this.getContext().actorOf(StreamingSessionActor.props(connectionCorrelationId, connect.getType(), this.pubSubMediator, eventAndResponsePublisher), connectionCorrelationId);
        }).match(StartStreaming.class, startStreaming -> this.forwardToSessionActor(startStreaming.getConnectionCorrelationId(), startStreaming)).match(StopStreaming.class, stopStreaming -> this.forwardToSessionActor(stopStreaming.getConnectionCorrelationId(), stopStreaming)).match(Signal.class, signal -> StreamingActor.isLiveSignal(signal) && !StreamingActor.isEnrichedLiveSignal(signal), liveSignal -> this.proxyActor.tell(liveSignal, this.getSelf())).match(Signal.class, StreamingActor::isLiveSignal, liveSignal -> {
            Optional<String> correlationIdOpt = StreamingActor.extractConnectionCorrelationId((WithDittoHeaders)liveSignal);
            if (correlationIdOpt.isPresent()) {
                this.forwardToSessionActor(correlationIdOpt.get(), liveSignal);
            } else {
                this.logger.warning("CorrelationId for LiveSignal in wrong format: {}", (Object)liveSignal.getDittoHeaders().getCorrelationId());
            }
        }).match(Command.class, command -> this.proxyActor.forward(command, (ActorContext)this.getContext())).match(CommandResponse.class, commandResponse -> {
            Optional<String> correlationIdOpt = StreamingActor.extractConnectionCorrelationId((WithDittoHeaders)commandResponse);
            if (correlationIdOpt.isPresent()) {
                this.forwardToSessionActor(correlationIdOpt.get(), commandResponse);
            } else {
                this.logger.warning("CorrelationId for CommandResponse in wrong format: {}", (Object)commandResponse.getDittoHeaders().getCorrelationId());
            }
        }).match(DittoRuntimeException.class, cre -> {
            Optional<String> correlationIdOpt = StreamingActor.extractConnectionCorrelationId((WithDittoHeaders)cre);
            if (correlationIdOpt.isPresent()) {
                this.forwardToSessionActor(correlationIdOpt.get(), cre);
            } else {
                this.logger.warning("Unhandled DittoRuntimeException: <{}: {}>", (Object)cre.getClass().getSimpleName(), (Object)cre.getMessage());
            }
        }).matchAny(any -> this.logger.warning("Got unknown message: '{}'", any)).build();
    }

    private static boolean isEnrichedLiveSignal(Signal<?> signal) {
        return StreamingActor.isLiveSignal(signal) && !signal.getDittoHeaders().getReadSubjects().isEmpty();
    }

    private static boolean isLiveSignal(WithDittoHeaders<?> signal) {
        return signal.getDittoHeaders().getChannel().filter(TopicPath.Channel.LIVE.getName()::equals).isPresent();
    }

    private static Optional<String> extractConnectionCorrelationId(WithDittoHeaders withDittoHeaders) {
        return withDittoHeaders.getDittoHeaders().getCorrelationId().map(cId -> cId.split(":", 2)).map(cIds -> cIds[0]);
    }

    private void forwardToSessionActor(String connectionCorrelationId, Object object) {
        if (object instanceof WithDittoHeaders) {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (WithDittoHeaders)((WithDittoHeaders)object));
        } else {
            LogUtil.enhanceLogWithCorrelationId((DiagnosticLoggingAdapter)this.logger, (String)null);
        }
        this.logger.debug("Forwarding to session actor '{}': {}", (Object)connectionCorrelationId, object);
        this.getContext().actorSelection(connectionCorrelationId).forward(object, (ActorContext)this.getContext());
    }
}

