/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.system.partitions.impl.steps;

import io.atomix.raft.RaftServer;
import io.camunda.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.broker.exporter.stream.ExporterDirectorContext;
import io.camunda.zeebe.broker.exporter.stream.ExporterPhase;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.dynamic.config.state.ExporterState;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.stream.api.EventFilter;
import io.camunda.zeebe.stream.impl.SkipPositionsFilter;
import io.camunda.zeebe.util.health.HealthMonitorable;
import java.time.InstantSource;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

public final class ExporterDirectorPartitionTransitionStep
implements PartitionTransitionStep {
    private static final int EXPORTER_PROCESSOR_ID = 1003;
    private final BiFunction<ExporterDirectorContext, ExporterPhase, ExporterDirector> exporterDirectorBuilder;

    public ExporterDirectorPartitionTransitionStep() {
        this(ExporterDirector::new);
    }

    ExporterDirectorPartitionTransitionStep(BiFunction<ExporterDirectorContext, ExporterPhase, ExporterDirector> exporterDirectorBuilder) {
        this.exporterDirectorBuilder = exporterDirectorBuilder;
    }

    @Override
    public void onNewRaftRole(PartitionTransitionContext context, RaftServer.Role newRole) {
        ExporterDirector director = context.getExporterDirector();
        if (director != null && this.shouldCloseOnTransition(newRole, context.getCurrentRole())) {
            director.pauseExporting();
        }
    }

    @Override
    public ActorFuture<Void> prepareTransition(PartitionTransitionContext context, long term, RaftServer.Role targetRole) {
        ExporterDirector director = context.getExporterDirector();
        if (director != null && this.shouldCloseOnTransition(targetRole, context.getCurrentRole())) {
            context.getComponentHealthMonitor().removeComponent(director.getName());
            ActorFuture future = director.closeAsync();
            future.onComplete((success, error) -> {
                if (error == null) {
                    context.setExporterDirector(null);
                }
            });
            return future;
        }
        return CompletableActorFuture.completed(null);
    }

    @Override
    public ActorFuture<Void> transitionTo(PartitionTransitionContext context, long term, RaftServer.Role targetRole) {
        if (this.shouldInstallOnTransition(targetRole, context.getCurrentRole()) || context.getExporterDirector() == null && targetRole != RaftServer.Role.INACTIVE) {
            return this.openExporter(context, targetRole);
        }
        return CompletableActorFuture.completed(null);
    }

    @Override
    public String getName() {
        return "ExporterDirector";
    }

    private boolean shouldInstallOnTransition(RaftServer.Role newRole, RaftServer.Role currentRole) {
        return newRole == RaftServer.Role.LEADER || newRole == RaftServer.Role.FOLLOWER && currentRole != RaftServer.Role.CANDIDATE || newRole == RaftServer.Role.CANDIDATE && currentRole != RaftServer.Role.FOLLOWER;
    }

    private boolean shouldCloseOnTransition(RaftServer.Role newRole, RaftServer.Role currentRole) {
        return this.shouldInstallOnTransition(newRole, currentRole) || newRole == RaftServer.Role.INACTIVE;
    }

    private ActorFuture<Void> openExporter(PartitionTransitionContext context, RaftServer.Role targetRole) {
        Map<ExporterDescriptor, ExporterDirector.ExporterInitializationInfo> exporterDescriptors = ExporterDirectorPartitionTransitionStep.getEnabledExporterDescriptors(context);
        SkipPositionsFilter exporterFilter = SkipPositionsFilter.of(context.getBrokerCfg() != null ? context.getBrokerCfg().getExporting().getSkipRecords() : Set.of());
        ExporterDirectorContext.ExporterMode exporterMode = targetRole == RaftServer.Role.LEADER ? ExporterDirectorContext.ExporterMode.ACTIVE : ExporterDirectorContext.ExporterMode.PASSIVE;
        ExporterDirectorContext exporterCtx = new ExporterDirectorContext().id(1003).name(Actor.buildActorName((String)"Exporter", (int)context.getPartitionId())).clock((InstantSource)context.getStreamClock()).logStream(context.getLogStream()).zeebeDb(context.getZeebeDb()).partitionMessagingService(context.getMessagingService()).descriptors(exporterDescriptors).exporterMode(exporterMode).positionsToSkipFilter((EventFilter)exporterFilter).meterRegistry(context.getPartitionMeterRegistry());
        ExporterDirector director = this.exporterDirectorBuilder.apply(exporterCtx, context.getExporterPhase());
        context.getComponentHealthMonitor().registerComponent(director.getName(), (HealthMonitorable)director);
        ActorFuture<Void> startFuture = director.startAsync(context.getActorSchedulingService());
        startFuture.onComplete((nothing, error) -> {
            if (error == null) {
                context.setExporterDirector(director);
                switch (context.getExporterPhase()) {
                    case PAUSED: {
                        director.pauseExporting();
                        break;
                    }
                    case SOFT_PAUSED: {
                        director.softPauseExporting();
                        break;
                    }
                    default: {
                        director.resumeExporting();
                    }
                }
                this.disableOrEnableExportersIfConfigChanged(exporterDescriptors, context);
            }
        });
        return startFuture;
    }

    private void disableOrEnableExportersIfConfigChanged(Map<ExporterDescriptor, ExporterDirector.ExporterInitializationInfo> startedExporters, PartitionTransitionContext context) {
        Map<ExporterDescriptor, ExporterDirector.ExporterInitializationInfo> currentEnabledExporters = ExporterDirectorPartitionTransitionStep.getEnabledExporterDescriptors(context);
        for (ExporterDescriptor exporterDescriptor : startedExporters.keySet()) {
            if (currentEnabledExporters.containsKey(exporterDescriptor)) continue;
            context.getExporterDirector().disableExporter(exporterDescriptor.getId());
        }
        for (Map.Entry entry : currentEnabledExporters.entrySet()) {
            ExporterDescriptor exporter = (ExporterDescriptor)entry.getKey();
            if (startedExporters.containsKey(exporter)) continue;
            context.getExporterDirector().enableExporterWithRetry(exporter.getId(), (ExporterDirector.ExporterInitializationInfo)entry.getValue(), exporter);
        }
    }

    private static Map<ExporterDescriptor, ExporterDirector.ExporterInitializationInfo> getEnabledExporterDescriptors(PartitionTransitionContext context) {
        Collection<ExporterDescriptor> exporterDescriptors = context.getExportedDescriptors();
        Map exporterConfig = context.getDynamicPartitionConfig().exporting().exporters();
        return exporterDescriptors.stream().filter(exporterDescriptor -> ExporterDirectorPartitionTransitionStep.isEnabled(exporterConfig, exporterDescriptor)).collect(Collectors.toMap(Function.identity(), descriptor -> ExporterDirectorPartitionTransitionStep.getInitializationInfo(descriptor, exporterConfig)));
    }

    private static ExporterDirector.ExporterInitializationInfo getInitializationInfo(ExporterDescriptor descriptor, Map<String, ExporterState> exportersConfig) {
        if (exportersConfig.containsKey(descriptor.getId())) {
            ExporterState config = exportersConfig.get(descriptor.getId());
            return new ExporterDirector.ExporterInitializationInfo(config.metadataVersion(), config.initializedFrom().orElse(null));
        }
        return new ExporterDirector.ExporterInitializationInfo(0L, null);
    }

    private static boolean isEnabled(Map<String, ExporterState> exporterConfig, ExporterDescriptor exporterDescriptor) {
        return exporterConfig.containsKey(exporterDescriptor.getId()) && exporterConfig.get(exporterDescriptor.getId()).state() == ExporterState.State.ENABLED;
    }
}

