/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.system.management;

import io.atomix.raft.RaftServer;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.broker.exporter.stream.ExporterPhase;
import io.camunda.zeebe.broker.partitioning.PartitionAdminAccess;
import io.camunda.zeebe.broker.partitioning.PartitionManagerImpl;
import io.camunda.zeebe.broker.system.management.BrokerAdminService;
import io.camunda.zeebe.broker.system.management.PartitionStatus;
import io.camunda.zeebe.broker.system.partitions.ZeebePartition;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.ActorFutureCollector;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.camunda.zeebe.stream.api.StreamClock;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;

public final class BrokerAdminServiceImpl
extends Actor
implements BrokerAdminService {
    private static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private final PartitionManagerImpl partitionManager;

    public BrokerAdminServiceImpl(PartitionManagerImpl partitionManager) {
        this.partitionManager = partitionManager;
    }

    @Override
    public void pauseStreamProcessing() {
        this.actor.call(this::pauseStreamProcessingOnAllPartitions);
    }

    @Override
    public void resumeStreamProcessing() {
        this.actor.call(this::resumeStreamProcessingOnAllPartitions);
    }

    @Override
    public void pauseExporting() {
        this.actor.call(this::pauseExportingOnAllPartitions);
    }

    @Override
    public void softPauseExporting() {
        this.actor.call(this::softPauseExportingOnAllPartitions);
    }

    @Override
    public void resumeExporting() {
        this.actor.call(this::resumeExportingOnAllPartitions);
    }

    @Override
    public void takeSnapshot() {
        this.actor.call(this::takeSnapshotOnAllPartitions);
    }

    @Override
    public void prepareForUpgrade() {
        this.actor.call(this::prepareAllPartitionsForSafeUpgrade);
    }

    @Override
    public Map<Integer, PartitionStatus> getPartitionStatus() {
        CompletableFuture future = new CompletableFuture();
        ConcurrentHashMap partitionStatuses = new ConcurrentHashMap();
        Collection<ZeebePartition> partitions = this.partitionManager.getZeebePartitions();
        this.actor.call(() -> {
            if (partitions.isEmpty()) {
                future.complete(partitionStatuses);
            } else {
                List<CompletableFuture> statusFutures = partitions.stream().map(partition -> this.getPartitionStatus((ZeebePartition)partition).whenComplete((ps, error) -> {
                    if (error == null) {
                        partitionStatuses.put(partition.getPartitionId(), ps);
                    }
                })).toList();
                CompletableFuture.allOf((CompletableFuture[])statusFutures.toArray(CompletableFuture[]::new)).thenAccept(r -> future.complete(partitionStatuses));
            }
        });
        try {
            return (Map)future.get(5L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.warn("Error when querying partition status", (Throwable)e);
            return Map.of();
        }
    }

    private CompletableFuture<PartitionStatus> getPartitionStatus(ZeebePartition partition) {
        CompletableFuture<PartitionStatus> partitionStatus = new CompletableFuture<PartitionStatus>();
        ActorFuture<RaftServer.Role> currentRoleFuture = partition.getCurrentRole();
        ActorFuture<Optional<StreamProcessor>> streamProcessorFuture = partition.getStreamProcessor();
        ActorFuture<Optional<ExporterDirector>> exporterDirectorFuture = partition.getExporterDirector();
        this.actor.runOnCompletion(List.of(streamProcessorFuture, exporterDirectorFuture), error -> {
            if (error != null) {
                partitionStatus.completeExceptionally((Throwable)error);
                return;
            }
            RaftServer.Role role = (RaftServer.Role)currentRoleFuture.join();
            Optional streamProcessor = (Optional)streamProcessorFuture.join();
            Optional exporterDirector = (Optional)exporterDirectorFuture.join();
            if (streamProcessor.isEmpty()) {
                partitionStatus.completeExceptionally(new IllegalStateException("No streamProcessor found for partition: %d.".formatted(partition.getPartitionId())));
            } else if (exporterDirector.isEmpty()) {
                partitionStatus.completeExceptionally(new IllegalStateException("No exporter found for partition: %d .".formatted(partition.getPartitionId())));
            } else {
                this.getPartitionStatus(role, partition, (StreamProcessor)streamProcessor.get(), (ExporterDirector)((Object)((Object)exporterDirector.get())), partitionStatus);
            }
        });
        return partitionStatus;
    }

    private void getPartitionStatus(RaftServer.Role role, ZeebePartition partition, StreamProcessor streamProcessor, ExporterDirector exporterDirector, CompletableFuture<PartitionStatus> partitionStatus) {
        ActorFuture positionFuture = streamProcessor.getLastProcessedPositionAsync();
        ActorFuture currentPhaseFuture = streamProcessor.getCurrentPhase();
        ActorFuture<ExporterPhase> exporterPhaseFuture = exporterDirector.getPhase();
        ActorFuture<Long> exporterPositionFuture = exporterDirector.getLowestPosition();
        Optional<String> snapshotId = this.getSnapshotId(partition);
        Long processedPositionInSnapshot = snapshotId.flatMap(FileBasedSnapshotId::ofFileName).map(FileBasedSnapshotId::getProcessedPosition).orElse(null);
        ActorFuture clockFuture = streamProcessor.getClock();
        this.actor.runOnCompletion(List.of(positionFuture, currentPhaseFuture, exporterPhaseFuture, exporterPositionFuture, clockFuture), error -> {
            if (error != null) {
                partitionStatus.completeExceptionally((Throwable)error);
                return;
            }
            Long processedPosition = (Long)positionFuture.join();
            StreamProcessor.Phase processorPhase = (StreamProcessor.Phase)currentPhaseFuture.join();
            ExporterPhase exporterPhase = (ExporterPhase)((Object)((Object)exporterPhaseFuture.join()));
            Long exporterPosition = (Long)exporterPositionFuture.join();
            PartitionStatus.ClockStatus clock = this.getClockStatus((StreamClock)clockFuture.join());
            PartitionStatus status = new PartitionStatus(role, processedPosition, snapshotId.orElse(null), processedPositionInSnapshot, processorPhase, exporterPhase, exporterPosition, clock);
            partitionStatus.complete(status);
        });
    }

    private PartitionStatus.ClockStatus getClockStatus(StreamClock clock) {
        StreamClock.ControllableStreamClock.Modification modification = clock.currentModification();
        return new PartitionStatus.ClockStatus(clock.instant(), modification.getClass().getSimpleName(), modification);
    }

    private Optional<String> getSnapshotId(ZeebePartition partition) {
        return partition.getSnapshotStore().getLatestSnapshot().map(PersistedSnapshot::getId);
    }

    private void prepareAllPartitionsForSafeUpgrade() {
        LOG.info("Preparing for safe upgrade.");
        ActorFuture<List<Void>> pauseProcessingCompleted = this.pauseStreamProcessingOnAllPartitions();
        ActorFuture<List<Void>> pauseExportingCompleted = this.pauseExportingOnAllPartitions();
        List pauseAll = Stream.of(pauseProcessingCompleted, pauseExportingCompleted).collect(Collectors.toList());
        this.actor.runOnCompletion(pauseAll, t -> this.takeSnapshotOnAllPartitions());
    }

    private ActorFuture<List<Void>> pauseStreamProcessingOnAllPartitions() {
        LOG.info("Pausing StreamProcessor on all partitions.");
        return (ActorFuture)this.partitionManager.getZeebePartitions().stream().map(ZeebePartition::getAdminAccess).map(PartitionAdminAccess::pauseProcessing).collect(new ActorFutureCollector((ConcurrencyControl)this.actor));
    }

    private ActorFuture<List<Void>> resumeStreamProcessingOnAllPartitions() {
        LOG.info("Resume StreamProcessor on all partitions.");
        return (ActorFuture)this.partitionManager.getZeebePartitions().stream().map(ZeebePartition::getAdminAccess).map(PartitionAdminAccess::resumeProcessing).collect(new ActorFutureCollector((ConcurrencyControl)this.actor));
    }

    private ActorFuture<List<Void>> takeSnapshotOnAllPartitions() {
        LOG.info("Triggering Snapshots on all partitions.");
        return (ActorFuture)this.partitionManager.getZeebePartitions().stream().map(ZeebePartition::getAdminAccess).map(PartitionAdminAccess::takeSnapshot).collect(new ActorFutureCollector((ConcurrencyControl)this.actor));
    }

    private ActorFuture<List<Void>> softPauseExportingOnAllPartitions() {
        LOG.info("Soft Pausing exporting on all partitions.");
        return (ActorFuture)this.partitionManager.getZeebePartitions().stream().map(ZeebePartition::getAdminAccess).map(PartitionAdminAccess::softPauseExporting).collect(new ActorFutureCollector((ConcurrencyControl)this.actor));
    }

    private ActorFuture<List<Void>> pauseExportingOnAllPartitions() {
        LOG.info("Pausing exporting on all partitions.");
        return (ActorFuture)this.partitionManager.getZeebePartitions().stream().map(ZeebePartition::getAdminAccess).map(PartitionAdminAccess::pauseExporting).collect(new ActorFutureCollector((ConcurrencyControl)this.actor));
    }

    private ActorFuture<List<Void>> resumeExportingOnAllPartitions() {
        LOG.info("Resuming exporting on all partitions.");
        return (ActorFuture)this.partitionManager.getZeebePartitions().stream().map(ZeebePartition::getAdminAccess).map(PartitionAdminAccess::resumeExporting).collect(new ActorFutureCollector((ConcurrencyControl)this.actor));
    }
}

