/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.event.PersistentStream;
import io.axoniq.axonserver.connector.event.PersistentStreamCallbacks;
import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
import io.axoniq.axonserver.connector.event.impl.BufferedPersistentStreamSegment;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.streams.InitializationProperties;
import io.axoniq.axonserver.grpc.streams.Open;
import io.axoniq.axonserver.grpc.streams.ProgressAcknowledgement;
import io.axoniq.axonserver.grpc.streams.SegmentError;
import io.axoniq.axonserver.grpc.streams.StreamRequest;
import io.axoniq.axonserver.grpc.streams.StreamSignal;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStreamImpl
implements PersistentStream,
ClientResponseObserver<StreamRequest, StreamSignal> {
    private static final Logger logger = LoggerFactory.getLogger(PersistentStreamImpl.class);
    private static final Consumer<Throwable> NO_OP = ex -> {};
    private final Map<Integer, BufferedPersistentStreamSegment> openSegments = new ConcurrentHashMap<Integer, BufferedPersistentStreamSegment>();
    private final String streamId;
    private final String clientId;
    private final AtomicReference<ClientCallStreamObserver<StreamRequest>> outboundStreamHolder = new AtomicReference();
    private final AtomicReference<Consumer<Throwable>> onClosedCallback = new AtomicReference<Consumer<Throwable>>(NO_OP);
    private final Set<Consumer<PersistentStreamSegment>> onSegmentOpenedCallbacks = new CopyOnWriteArraySet<Consumer<PersistentStreamSegment>>();
    private final Set<Consumer<PersistentStreamSegment>> segmentOnAvailable = new CopyOnWriteArraySet<Consumer<PersistentStreamSegment>>();
    private final Set<Consumer<PersistentStreamSegment>> segmentOnClose = new CopyOnWriteArraySet<Consumer<PersistentStreamSegment>>();
    private final Set<Integer> closeConfirmationsSent = new CopyOnWriteArraySet<Integer>();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final int bufferSize;
    private final int refillBatch;

    public PersistentStreamImpl(ClientIdentification clientId, String streamId, int bufferSize, int refillBatch, PersistentStreamCallbacks callbacks) {
        this.streamId = streamId;
        this.clientId = clientId.getClientId();
        this.bufferSize = bufferSize;
        this.refillBatch = refillBatch;
        if (callbacks.onAvailable() != null) {
            this.segmentOnAvailable.add(callbacks.onAvailable());
        }
        if (callbacks.onClosed() != null) {
            this.onClosedCallback.set(callbacks.onClosed());
        }
        if (callbacks.onSegmentOpened() != null) {
            this.onSegmentOpenedCallbacks.add(callbacks.onSegmentOpened());
        }
        if (callbacks.onSegmentClosed() != null) {
            this.segmentOnClose.add(callbacks.onSegmentClosed());
        }
    }

    public void openConnection() {
        this.openConnection(null);
    }

    public void openConnection(InitializationProperties initializationProperties) {
        Open.Builder openRequest = Open.newBuilder().setStreamId(this.streamId).setClientId(this.clientId);
        if (initializationProperties != null) {
            openRequest.setInitializationProperties(initializationProperties);
        }
        this.outboundStreamHolder.get().onNext((Object)StreamRequest.newBuilder().setOpen(openRequest.build()).build());
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            Instant timeout = Instant.now().plus(Duration.ofSeconds(2L));
            this.openSegments.forEach((segmentNumber, segment) -> segment.close());
            while (this.closeConfirmationsSent.size() != this.openSegments.size() && Instant.now().isBefore(timeout)) {
                try {
                    logger.debug("{}: Waiting for segments to complete {} of {}", new Object[]{this.streamId, this.closeConfirmationsSent.size(), this.openSegments.size()});
                    Thread.sleep(50L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            logger.debug("{}: Waited for segments to complete {}", (Object)this.streamId, this.closeConfirmationsSent);
            this.sendCompleted();
        }
    }

    public void beforeStart(ClientCallStreamObserver<StreamRequest> clientCallStreamObserver) {
        this.outboundStreamHolder.set(new StreamRequestClientCallStreamObserver(clientCallStreamObserver));
    }

    public void onNext(StreamSignal streamSignal) {
        BufferedPersistentStreamSegment segment;
        if (streamSignal.hasOpen()) {
            this.getPersistentStreamSegment(streamSignal.getSegment());
        }
        if (streamSignal.hasEvent()) {
            segment = this.getPersistentStreamSegment(streamSignal.getSegment());
            segment.onNext(streamSignal.getEvent());
        }
        if (streamSignal.getClosed()) {
            logger.debug("Received: {} - closed", (Object)streamSignal.getSegment());
            segment = this.openSegments.remove(streamSignal.getSegment());
            if (segment != null) {
                segment.onCompleted();
            }
        }
    }

    private BufferedPersistentStreamSegment getPersistentStreamSegment(int segmentNr) {
        boolean isNew = !this.openSegments.containsKey(segmentNr);
        BufferedPersistentStreamSegment segment = this.openSegments.computeIfAbsent(segmentNr, s -> {
            BufferedPersistentStreamSegment stream = new BufferedPersistentStreamSegment(this.streamId, segmentNr, this.bufferSize, this.refillBatch, progress -> this.acknowledge((int)s, progress), error -> this.sendError((int)s, (String)error));
            stream.beforeStart(this.outboundStreamHolder.get());
            stream.enableFlowControl();
            return stream;
        });
        if (isNew) {
            this.onSegmentOpenedCallbacks.forEach(callback -> callback.accept(segment));
            this.segmentOnAvailable.forEach(a -> segment.onAvailable(() -> a.accept(segment)));
            this.segmentOnClose.forEach(a -> segment.onSegmentClosed(() -> a.accept(segment)));
            this.closeConfirmationsSent.remove(segment.segment());
        }
        return segment;
    }

    private void acknowledge(int segment, long progress) {
        this.outboundStreamHolder.get().onNext((Object)StreamRequest.newBuilder().setAcknowledgeProgress(ProgressAcknowledgement.newBuilder().setSegment(segment).setPosition(progress).build()).build());
        if (progress == -45L) {
            logger.info("{}: Close confirmed for segment {}", (Object)this.streamId, (Object)segment);
            this.closeConfirmationsSent.add(segment);
        }
    }

    private void sendError(int segment, String error) {
        this.outboundStreamHolder.get().onNext((Object)StreamRequest.newBuilder().setError(SegmentError.newBuilder().setSegment(segment).setError(error).build()).build());
    }

    public void onError(Throwable throwable) {
        logger.warn("{}: Error on stream: {}", new Object[]{this.streamId, throwable.getMessage(), throwable});
        this.close(throwable);
    }

    public void onCompleted() {
        this.sendCompleted();
        this.close(null);
    }

    private void sendCompleted() {
        try {
            this.outboundStreamHolder.get().onCompleted();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void close(Throwable throwable) {
        this.closed.set(true);
        this.openSegments.forEach((segment, buffer) -> {
            try {
                if (throwable != null) {
                    buffer.onError(throwable);
                } else {
                    buffer.onCompleted();
                }
            }
            catch (Exception ex) {
                logger.debug("{}: Exception while completing segment {}", new Object[]{this.streamId, buffer.segment(), ex});
            }
        });
        this.onClosedCallback.get().accept(throwable);
    }

    private class StreamRequestClientCallStreamObserver
    extends ClientCallStreamObserver<StreamRequest> {
        private final ClientCallStreamObserver<StreamRequest> clientCallStreamObserver;

        public StreamRequestClientCallStreamObserver(ClientCallStreamObserver<StreamRequest> clientCallStreamObserver) {
            this.clientCallStreamObserver = clientCallStreamObserver;
        }

        public void cancel(@Nullable String s, @Nullable Throwable throwable) {
            logger.debug("{}: Ignore cancel: {}", new Object[]{PersistentStreamImpl.this.streamId, s, throwable});
        }

        public boolean isReady() {
            return this.clientCallStreamObserver.isReady();
        }

        public void setOnReadyHandler(Runnable runnable) {
            this.clientCallStreamObserver.setOnReadyHandler(runnable);
        }

        public void request(int i) {
            this.clientCallStreamObserver.request(i);
        }

        public void setMessageCompression(boolean b) {
            this.clientCallStreamObserver.setMessageCompression(b);
        }

        public void disableAutoInboundFlowControl() {
        }

        public void disableAutoRequestWithInitial(int request) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNext(StreamRequest streamRequest) {
            AtomicReference atomicReference = PersistentStreamImpl.this.outboundStreamHolder;
            synchronized (atomicReference) {
                logger.trace("Send {}", (Object)streamRequest);
                this.clientCallStreamObserver.onNext((Object)streamRequest);
            }
        }

        public void onError(Throwable throwable) {
            try {
                this.clientCallStreamObserver.onError(throwable);
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }

        public void onCompleted() {
            try {
                this.clientCallStreamObserver.onCompleted();
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }
}

