/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Function;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.HelixPropertyListener;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PinotRealtimeSegmentManager
implements HelixPropertyListener,
IZkChildListener,
IZkDataListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSegmentManager.class);
    private static final String TABLE_CONFIG = "/CONFIGS/TABLE";
    private static final String SEGMENTS_PATH = "/SEGMENTS";
    private static final String REALTIME_SEGMENT_PROPERTY_STORE_PATH_PATTERN = ".*/SEGMENTS/.*_REALTIME|.*/SEGMENTS/.*_REALTIME/.*";
    private static final String REALTIME_TABLE_CONFIG_PROPERTY_STORE_PATH_PATTERN = ".*/TABLE/.*REALTIME";
    private static final String CONTROLLER_LEADER_CHANGE = "CONTROLLER LEADER CHANGE";
    private final String _propertyStorePath;
    private final String _tableConfigPath;
    private final PinotHelixResourceManager _pinotHelixResourceManager;
    private ZkClient _zkClient;
    private ControllerMetrics _controllerMetrics;
    private final LeadControllerManager _leadControllerManager;
    private final ControllerConf _controllerConf;

    public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager, LeadControllerManager leadControllerManager, ControllerConf controllerConf) {
        this._pinotHelixResourceManager = pinotManager;
        this._leadControllerManager = leadControllerManager;
        this._controllerConf = controllerConf;
        String clusterName = this._pinotHelixResourceManager.getHelixClusterName();
        this._propertyStorePath = PropertyPathBuilder.propertyStore((String)clusterName);
        this._tableConfigPath = this._propertyStorePath + TABLE_CONFIG;
    }

    public void start(ControllerMetrics controllerMetrics) {
        this._controllerMetrics = controllerMetrics;
        LOGGER.info("Starting realtime segments manager, adding a listener on the property store table configs path.");
        String zkUrl = this._pinotHelixResourceManager.getHelixZkURL();
        int zkClientSessionTimeoutMs = this._controllerConf.getProperty("zk.client.session.timeout.ms", 30000);
        int zkClientConnectionTimeoutMs = this._controllerConf.getProperty("zk.client.connection.timeout.ms", 60000);
        this._zkClient = new ZkClient.Builder().setZkServer(zkUrl).setSessionTimeout(Integer.valueOf(zkClientSessionTimeoutMs)).setConnectionTimeout(Integer.valueOf(zkClientConnectionTimeoutMs)).build();
        this._zkClient.setZkSerializer((ZkSerializer)new ZNRecordSerializer());
        this._zkClient.waitUntilConnected((long)zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
        this._zkClient.subscribeChildChanges(this._tableConfigPath, (IZkChildListener)this);
        this._zkClient.subscribeDataChanges(this._tableConfigPath, (IZkDataListener)this);
        this.processPropertyStoreChange(this._tableConfigPath);
    }

    public void stop() {
        LOGGER.info("Stopping realtime segments manager, stopping property store.");
        this._pinotHelixResourceManager.getPropertyStore().stop();
    }

    private synchronized void assignRealtimeSegmentsToServerInstancesIfNecessary() {
        HashMap<String, IdealState> idealStateMap = new HashMap<String, IdealState>();
        for (String realtimeTableName : this._pinotHelixResourceManager.getAllRealtimeTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(realtimeTableName);
            if (tableConfig == null || !this._leadControllerManager.isLeaderForTable(realtimeTableName)) continue;
            StreamConfig metadata = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap((TableConfig)tableConfig));
            if (metadata.hasHighLevelConsumerType()) {
                idealStateMap.put(realtimeTableName, this._pinotHelixResourceManager.getHelixAdmin().getResourceIdealState(this._pinotHelixResourceManager.getHelixClusterName(), realtimeTableName));
                continue;
            }
            LOGGER.debug("Not considering table {} for realtime segment assignment", (Object)realtimeTableName);
        }
        ArrayList<Pair> listOfSegmentsToAddToInstances = new ArrayList<Pair>();
        for (String realtimeTableName : idealStateMap.keySet()) {
            try {
                String partitionId;
                String groupId;
                InstanceZKMetadata instanceZKMetadata;
                IdealState state = (IdealState)idealStateMap.get(realtimeTableName);
                if (state.getPartitionSet().isEmpty()) {
                    ArrayList<String> instancesInResource = new ArrayList<String>();
                    try {
                        instancesInResource.addAll(this._pinotHelixResourceManager.getServerInstancesForTable(realtimeTableName, TableType.REALTIME));
                    }
                    catch (Exception e) {
                        LOGGER.error("Caught exception while fetching instances for resource {}", (Object)realtimeTableName, (Object)e);
                        this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
                    }
                    for (String instanceId : instancesInResource) {
                        instanceZKMetadata = this._pinotHelixResourceManager.getInstanceZKMetadata(instanceId);
                        if (instanceZKMetadata == null) {
                            LOGGER.warn("Instance {} has no associated instance metadata in ZK, ignoring for segment assignment.", (Object)instanceId);
                            this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
                            continue;
                        }
                        groupId = instanceZKMetadata.getGroupId(realtimeTableName);
                        partitionId = instanceZKMetadata.getPartition(realtimeTableName);
                        if (groupId != null && !groupId.isEmpty() && partitionId != null && !partitionId.isEmpty()) {
                            listOfSegmentsToAddToInstances.add(Pair.of((Object)new HLCSegmentName(groupId, partitionId, String.valueOf(System.currentTimeMillis())).getSegmentName(), (Object)instanceId));
                            continue;
                        }
                        LOGGER.warn("Instance {} has invalid groupId ({}) and/or partitionId ({}) for resource {}, ignoring for segment assignment.", new Object[]{instanceId, groupId, partitionId, realtimeTableName});
                        this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
                    }
                    continue;
                }
                HashSet<String> instancesToAssignRealtimeSegment = new HashSet<String>();
                try {
                    instancesToAssignRealtimeSegment.addAll(this._pinotHelixResourceManager.getServerInstancesForTable(realtimeTableName, TableType.REALTIME));
                }
                catch (Exception e) {
                    LOGGER.error("Caught exception while fetching instances for resource {}", (Object)realtimeTableName, (Object)e);
                    this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
                }
                for (String partition : state.getPartitionSet()) {
                    SegmentZKMetadata segmentZKMetadata;
                    if (!SegmentName.isHighLevelConsumerSegmentName((String)partition) || (segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._pinotHelixResourceManager.getPropertyStore(), (String)realtimeTableName, (String)partition)) == null || segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.IN_PROGRESS) continue;
                    instancesToAssignRealtimeSegment.removeAll(state.getInstanceSet(partition));
                }
                for (String instanceId : instancesToAssignRealtimeSegment) {
                    instanceZKMetadata = this._pinotHelixResourceManager.getInstanceZKMetadata(instanceId);
                    groupId = instanceZKMetadata.getGroupId(realtimeTableName);
                    partitionId = instanceZKMetadata.getPartition(realtimeTableName);
                    listOfSegmentsToAddToInstances.add(Pair.of((Object)new HLCSegmentName(groupId, partitionId, String.valueOf(System.currentTimeMillis())).getSegmentName(), (Object)instanceId));
                }
            }
            catch (Exception e) {
                LOGGER.warn("Caught exception while processing resource {}, skipping.", (Object)realtimeTableName, (Object)e);
                this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
            }
        }
        LOGGER.info("Computed list of new segments to add : " + Arrays.toString(listOfSegmentsToAddToInstances.toArray()));
        for (Pair segmentIdAndInstanceId : listOfSegmentsToAddToInstances) {
            final String segmentId = (String)segmentIdAndInstanceId.getLeft();
            final String instanceName = (String)segmentIdAndInstanceId.getRight();
            try {
                HLCSegmentName segName = new HLCSegmentName(segmentId);
                String resourceName = segName.getTableName();
                if (((IdealState)idealStateMap.get(resourceName)).getPartitionSet().contains(segmentId)) continue;
                SegmentZKMetadata segmentMetadataToAdd = new SegmentZKMetadata(segmentId);
                segmentMetadataToAdd.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
                ZKMetadataProvider.setSegmentZKMetadata(this._pinotHelixResourceManager.getPropertyStore(), (String)resourceName, (SegmentZKMetadata)segmentMetadataToAdd);
                HelixHelper.updateIdealState((HelixManager)this._pinotHelixResourceManager.getHelixZkManager(), (String)resourceName, (Function)new Function<IdealState, IdealState>(){

                    public IdealState apply(IdealState idealState) {
                        return PinotTableIdealStateBuilder.addNewRealtimeSegmentToIdealState(segmentId, idealState, instanceName);
                    }
                }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)500L, (double)2.0));
            }
            catch (Exception e) {
                LOGGER.warn("Caught exception while processing segment {} for instance {}, skipping.", new Object[]{segmentId, instanceName, e});
                this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_ERROR, 1L);
            }
        }
    }

    public synchronized void onDataChange(String path) {
        LOGGER.info("PinotRealtimeSegmentManager.onDataChange: {}", (Object)path);
        this.processPropertyStoreChange(path);
    }

    public synchronized void onDataCreate(String path) {
        LOGGER.info("PinotRealtimeSegmentManager.onDataCreate: {}", (Object)path);
        this.processPropertyStoreChange(path);
    }

    public synchronized void onDataDelete(String path) {
        LOGGER.info("PinotRealtimeSegmentManager.onDataDelete: {}", (Object)path);
        this.processPropertyStoreChange(path);
    }

    private void processPropertyStoreChange(String path) {
        try {
            LOGGER.info("Processing change notification for path: {}", (Object)path);
            this.refreshWatchers(path);
            if (path.matches(REALTIME_SEGMENT_PROPERTY_STORE_PATH_PATTERN) || path.matches(REALTIME_TABLE_CONFIG_PROPERTY_STORE_PATH_PATTERN) || path.equals(CONTROLLER_LEADER_CHANGE)) {
                this.assignRealtimeSegmentsToServerInstancesIfNecessary();
            }
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while processing change for path {}", (Object)path, (Object)e);
            Utils.rethrowException((Throwable)e);
        }
    }

    private void refreshWatchers(String path) {
        LOGGER.info("Received change notification for path: {}", (Object)path);
        ArrayList stats = new ArrayList();
        List tableConfigs = this._pinotHelixResourceManager.getPropertyStore().getChildren(TABLE_CONFIG, stats, 0);
        if (tableConfigs == null) {
            return;
        }
        for (ZNRecord tableConfigZnRecord : tableConfigs) {
            if (tableConfigZnRecord == null) continue;
            try {
                TableConfig tableConfig;
                StreamConfig metadata;
                String znRecordId = tableConfigZnRecord.getId();
                if (TableNameBuilder.getTableTypeFromTableName((String)znRecordId) != TableType.REALTIME || !(metadata = new StreamConfig((tableConfig = TableConfigUtils.fromZNRecord((ZNRecord)tableConfigZnRecord)).getTableName(), IngestionConfigUtils.getStreamConfigMap((TableConfig)tableConfig))).hasHighLevelConsumerType()) continue;
                String realtimeTableName = tableConfig.getTableName();
                String realtimeSegmentsPathForTable = this._propertyStorePath + "/SEGMENTS/" + realtimeTableName;
                LOGGER.info("Setting data/child changes watch for real-time table '{}'", (Object)realtimeTableName);
                this._zkClient.subscribeDataChanges(realtimeSegmentsPathForTable, (IZkDataListener)this);
                this._zkClient.subscribeChildChanges(realtimeSegmentsPathForTable, (IZkChildListener)this);
                List childNames = this._pinotHelixResourceManager.getPropertyStore().getChildNames("/SEGMENTS/" + realtimeTableName, 0);
                if (childNames == null || childNames.isEmpty()) continue;
                for (String segmentName : childNames) {
                    if (!SegmentName.isHighLevelConsumerSegmentName((String)segmentName)) continue;
                    String segmentPath = realtimeSegmentsPathForTable + "/" + segmentName;
                    SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._pinotHelixResourceManager.getPropertyStore(), (String)realtimeTableName, (String)segmentName);
                    if (segmentZKMetadata == null) continue;
                    if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
                        LOGGER.info("Setting data change watch for real-time segment currently being consumed: {}", (Object)segmentPath);
                        this._zkClient.subscribeDataChanges(segmentPath, (IZkDataListener)this);
                        continue;
                    }
                    this._zkClient.unsubscribeDataChanges(segmentPath, (IZkDataListener)this);
                }
            }
            catch (Exception e) {
                LOGGER.error("Caught exception while processing ZNRecord id: {}. Skipping node to continue setting watches", (Object)tableConfigZnRecord.getId(), (Object)e);
            }
        }
    }

    public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
        LOGGER.info("PinotRealtimeSegmentManager.handleChildChange: {}", (Object)parentPath);
        this.processPropertyStoreChange(parentPath);
        if (currentChildren != null) {
            for (String table : currentChildren) {
                if (!table.endsWith("_REALTIME")) continue;
                LOGGER.info("PinotRealtimeSegmentManager.handleChildChange with table: {}", (Object)(parentPath + "/" + table));
                this.processPropertyStoreChange(parentPath + "/" + table);
            }
        }
    }

    public void handleDataChange(String dataPath, Object data) throws Exception {
        LOGGER.info("PinotRealtimeSegmentManager.handleDataChange: {}", (Object)dataPath);
        this.processPropertyStoreChange(dataPath);
    }

    public void handleDataDeleted(String dataPath) throws Exception {
        LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", (Object)dataPath);
        this.processPropertyStoreChange(dataPath);
    }
}

