/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.druid.client.broker.BrokerClientImpl;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.rpc.FixedServiceLocator;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.http.BrokerSyncStatus;

public class CoordinatorDynamicConfigSyncer {
    private static final Logger log = new Logger(CoordinatorDynamicConfigSyncer.class);
    private final CoordinatorConfigManager configManager;
    private final ObjectMapper jsonMapper;
    private final DruidNodeDiscoveryProvider druidNodeDiscovery;
    private final ServiceClientFactory clientFactory;
    private final ScheduledExecutorService exec;
    private final ServiceEmitter emitter;
    @Nullable
    private Future<?> syncFuture = null;
    @GuardedBy(value="this")
    private final Set<BrokerSyncStatus> inSyncBrokers;
    private final AtomicReference<CoordinatorDynamicConfig> lastKnownConfig = new AtomicReference();

    @Inject
    public CoordinatorDynamicConfigSyncer(@EscalatedGlobal ServiceClientFactory clientFactory, CoordinatorConfigManager configManager, @Json ObjectMapper jsonMapper, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, ServiceEmitter emitter) {
        this.clientFactory = clientFactory;
        this.configManager = configManager;
        this.jsonMapper = jsonMapper;
        this.druidNodeDiscovery = druidNodeDiscoveryProvider;
        this.exec = Execs.scheduledSingleThreaded((String)"CoordinatorDynamicConfigSyncer-%d");
        this.inSyncBrokers = ConcurrentHashMap.newKeySet();
        this.emitter = emitter;
    }

    public void queueBroadcastConfigToBrokers() {
        this.exec.submit(this::broadcastConfigToBrokers);
    }

    @VisibleForTesting
    void broadcastConfigToBrokers() {
        this.invalidateInSyncBrokersIfNeeded();
        Stopwatch stopwatch = Stopwatch.createStarted();
        for (DiscoveryDruidNode broker : this.getKnownBrokers()) {
            this.pushConfigToBroker(broker);
        }
        this.emitStat(Stats.Configuration.TOTAL_SYNC_TIME, RowKey.empty(), stopwatch.millisElapsed());
    }

    public synchronized Set<BrokerSyncStatus> getInSyncBrokers() {
        return Set.copyOf(this.inSyncBrokers);
    }

    public void onLeaderStart() {
        log.info("Starting coordinator config syncing to brokers on leader node.", new Object[0]);
        this.syncFuture = this.exec.scheduleAtFixedRate(this::broadcastConfigToBrokers, 30L, 60L, TimeUnit.SECONDS);
    }

    public void onLeaderStop() {
        log.info("Not leader, stopping coordinator config syncing to brokers.", new Object[0]);
        if (this.syncFuture != null) {
            this.syncFuture.cancel(true);
        }
    }

    @LifecycleStop
    public void stop() {
        this.exec.shutdownNow();
    }

    private void pushConfigToBroker(DiscoveryDruidNode broker) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        ServiceLocation brokerLocation = CoordinatorDynamicConfigSyncer.convertDiscoveryNodeToServiceLocation(broker);
        BrokerClientImpl brokerClient = new BrokerClientImpl(this.clientFactory.makeClient(NodeRole.BROKER.getJsonName(), new FixedServiceLocator(brokerLocation), StandardRetryPolicy.builder().maxAttempts(6L).build()), this.jsonMapper);
        try {
            CoordinatorDynamicConfig currentDynamicConfig = this.configManager.getCurrentDynamicConfig();
            boolean success = (Boolean)brokerClient.updateCoordinatorDynamicConfig(currentDynamicConfig).get();
            if (success) {
                this.markBrokerAsSynced(currentDynamicConfig, brokerLocation);
            }
        }
        catch (Exception e) {
            log.error((Throwable)e, "Exception while syncing dynamic configuration to broker[%s]", new Object[]{brokerLocation});
            this.emitStat(Stats.Configuration.BROKER_SYNC_ERROR, RowKey.with(Dimension.SERVER, broker.getDruidNode().getHostAndPortToUse()).build(), 1L);
        }
        this.emitStat(Stats.Configuration.BROKER_SYNC_TIME, RowKey.with(Dimension.SERVER, broker.getDruidNode().getHostAndPortToUse()).build(), stopwatch.millisElapsed());
    }

    private Collection<DiscoveryDruidNode> getKnownBrokers() {
        return this.druidNodeDiscovery.getForNodeRole(NodeRole.BROKER).getAllNodes();
    }

    private synchronized void invalidateInSyncBrokersIfNeeded() {
        CoordinatorDynamicConfig currentDynamicConfig = this.configManager.getCurrentDynamicConfig();
        if (!currentDynamicConfig.equals(this.lastKnownConfig.get())) {
            this.inSyncBrokers.clear();
            this.lastKnownConfig.set(currentDynamicConfig);
        }
    }

    private synchronized void markBrokerAsSynced(CoordinatorDynamicConfig config, ServiceLocation broker) {
        if (config.equals(this.lastKnownConfig.get())) {
            this.inSyncBrokers.add(new BrokerSyncStatus(broker, System.currentTimeMillis()));
        }
    }

    @Nullable
    private static ServiceLocation convertDiscoveryNodeToServiceLocation(DiscoveryDruidNode discoveryDruidNode) {
        DruidNode druidNode = discoveryDruidNode.getDruidNode();
        if (druidNode == null) {
            return null;
        }
        return new ServiceLocation(druidNode.getHost(), druidNode.getPlaintextPort(), druidNode.getTlsPort(), "");
    }

    private void emitStat(CoordinatorStat stat, RowKey rowKey, long value) {
        ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder();
        rowKey.getValues().forEach((dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(), dimValue));
        this.emitter.emit((ServiceEventBuilder)eventBuilder.setMetric(stat.getMetricName(), (Number)value));
    }
}

