/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.Response;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.api.resources.StateType;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.BaseJsonConfig;
import org.apache.pinot.spi.config.ConfigUtils;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PinotHelixResourceManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
    private static final long CACHE_ENTRY_EXPIRE_TIME_HOURS = 6L;
    private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)1000L, (double)2.0);
    public static final String APPEND = "APPEND";
    private static final int DEFAULT_TABLE_UPDATER_LOCKERS_SIZE = 100;
    public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 600000L;
    public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1000L;
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
    private final Map<String, Map<String, Long>> _segmentCrcMap = new HashMap<String, Map<String, Long>>();
    private final Map<String, Map<String, Integer>> _lastKnownSegmentMetadataVersionMap = new HashMap<String, Map<String, Integer>>();
    private final Object[] _tableUpdaterLocks;
    private final LoadingCache<String, String> _instanceAdminEndpointCache;
    private final String _helixZkURL;
    private final String _helixClusterName;
    private final String _dataDir;
    private final boolean _isSingleTenantCluster;
    private final boolean _enableBatchMessageMode;
    private final boolean _allowHLCTables;
    private final int _deletedSegmentsRetentionInDays;
    private HelixManager _helixZkManager;
    private HelixAdmin _helixAdmin;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private HelixDataAccessor _helixDataAccessor;
    private PropertyKey.Builder _keyBuilder;
    private SegmentDeletionManager _segmentDeletionManager;
    private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
    private TableCache _tableCache;

    public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean allowHLCTables, int deletedSegmentsRetentionInDays) {
        this._helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
        this._helixClusterName = helixClusterName;
        this._dataDir = dataDir;
        this._isSingleTenantCluster = isSingleTenantCluster;
        this._enableBatchMessageMode = enableBatchMessageMode;
        this._deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
        this._allowHLCTables = allowHLCTables;
        this._instanceAdminEndpointCache = CacheBuilder.newBuilder().expireAfterWrite(6L, TimeUnit.HOURS).build((CacheLoader)new CacheLoader<String, String>(){

            public String load(String instanceId) {
                InstanceConfig instanceConfig = PinotHelixResourceManager.this.getHelixInstanceConfig(instanceId);
                Preconditions.checkNotNull((Object)instanceConfig, (String)"Failed to find instance config for: %s", (Object)instanceId);
                String hostname = instanceConfig.getHostName();
                if (hostname.startsWith("Server_")) {
                    hostname = hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
                }
                String protocol = "http";
                int port = 8097;
                int adminPort = instanceConfig.getRecord().getIntField("adminPort", -1);
                int adminHttpsPort = instanceConfig.getRecord().getIntField("adminHttpsPort", -1);
                if (adminPort > 0) {
                    protocol = "http";
                    port = adminPort;
                } else if (adminHttpsPort > 0) {
                    protocol = "https";
                    port = adminHttpsPort;
                }
                return String.format("%s://%s:%d", protocol, hostname, port);
            }
        });
        this._tableUpdaterLocks = new Object[100];
        for (int i = 0; i < this._tableUpdaterLocks.length; ++i) {
            this._tableUpdaterLocks[i] = new Object();
        }
        SIMPLE_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
    }

    public PinotHelixResourceManager(ControllerConf controllerConf) {
        this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed(), controllerConf.getDeletedSegmentsRetentionInDays());
    }

    public synchronized void start(HelixManager helixZkManager) {
        this._helixZkManager = helixZkManager;
        this._helixAdmin = this._helixZkManager.getClusterManagmentTool();
        this._propertyStore = this._helixZkManager.getHelixPropertyStore();
        this._helixDataAccessor = this._helixZkManager.getHelixDataAccessor();
        this._keyBuilder = this._helixDataAccessor.keyBuilder();
        this._segmentDeletionManager = new SegmentDeletionManager(this._dataDir, this._helixAdmin, this._helixClusterName, this._propertyStore, this._deletedSegmentsRetentionInDays);
        ZKMetadataProvider.setClusterTenantIsolationEnabled(this._propertyStore, (boolean)this._isSingleTenantCluster);
        HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this._helixClusterName).build();
        Map configs = this._helixAdmin.getConfig(helixConfigScope, Arrays.asList("enable.case.insensitive", "enable.case.insensitive.pql"));
        boolean caseInsensitive = Boolean.parseBoolean((String)configs.get("enable.case.insensitive")) || Boolean.parseBoolean((String)configs.get("enable.case.insensitive.pql"));
        this._tableCache = new TableCache(this._propertyStore, caseInsensitive);
    }

    public synchronized void stop() {
        this._segmentDeletionManager.stop();
    }

    public String getHelixZkURL() {
        return this._helixZkURL;
    }

    public String getHelixClusterName() {
        return this._helixClusterName;
    }

    public SegmentDeletionManager getSegmentDeletionManager() {
        return this._segmentDeletionManager;
    }

    public HelixManager getHelixZkManager() {
        return this._helixZkManager;
    }

    public HelixAdmin getHelixAdmin() {
        return this._helixAdmin;
    }

    public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
        return this._propertyStore;
    }

    public List<String> getAllInstances() {
        return this._helixAdmin.getInstancesInCluster(this._helixClusterName);
    }

    public List<InstanceConfig> getAllHelixInstanceConfigs() {
        return HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager);
    }

    @Nullable
    public InstanceConfig getHelixInstanceConfig(String instanceId) {
        return (InstanceConfig)this._helixDataAccessor.getProperty(this._keyBuilder.instanceConfig(instanceId));
    }

    @Nullable
    public InstanceZKMetadata getInstanceZKMetadata(String instanceId) {
        return ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, (String)instanceId);
    }

    public List<String> getBrokerInstancesFor(String tableName) {
        List<InstanceConfig> instanceConfigList = this.getBrokerInstancesConfigsFor(tableName);
        return instanceConfigList.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
    }

    public List<InstanceConfig> getBrokerInstancesConfigsFor(String tableName) {
        String brokerTenantName = null;
        TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(this._propertyStore, (String)tableName);
        if (offlineTableConfig != null) {
            brokerTenantName = offlineTableConfig.getTenantConfig().getBroker();
        } else {
            TableConfig realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(this._propertyStore, (String)tableName);
            if (realtimeTableConfig != null) {
                brokerTenantName = realtimeTableConfig.getTenantConfig().getBroker();
            }
        }
        return HelixHelper.getInstancesConfigsWithTag((List)HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), (String)TagNameUtils.getBrokerTagForTenant((String)brokerTenantName));
    }

    public List<String> getInstancesWithTag(String tag) {
        return HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)tag);
    }

    public synchronized PinotResourceManagerResponse addInstance(Instance instance, boolean updateBrokerResource) {
        List newTags;
        String instanceId = InstanceUtils.getHelixInstanceId((Instance)instance);
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(instanceId);
        if (instanceConfig != null) {
            throw new ClientErrorException(String.format("Instance: %s already exists", instanceId), Response.Status.CONFLICT);
        }
        instanceConfig = InstanceUtils.toHelixInstanceConfig((Instance)instance);
        this._helixAdmin.addInstance(this._helixClusterName, instanceConfig);
        boolean shouldUpdateBrokerResource = false;
        List newBrokerTags = null;
        if (instanceId.startsWith("Broker_") && updateBrokerResource && CollectionUtils.isNotEmpty((Collection)(newTags = instance.getTags()))) {
            newBrokerTags = newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList());
            boolean bl = shouldUpdateBrokerResource = !newBrokerTags.isEmpty();
        }
        if (shouldUpdateBrokerResource) {
            long startTimeMs = System.currentTimeMillis();
            ArrayList tablesAdded = new ArrayList();
            HelixHelper.updateBrokerResource((HelixManager)this._helixZkManager, (String)instanceId, newBrokerTags, tablesAdded, null);
            LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}", new Object[]{instanceId, newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded});
            return PinotResourceManagerResponse.success(String.format("Added instance: %s, and updated broker resource - tables added: %s", instanceId, tablesAdded));
        }
        return PinotResourceManagerResponse.success("Added instance: " + instanceId);
    }

    public synchronized PinotResourceManagerResponse updateInstance(String instanceId, Instance newInstance, boolean updateBrokerResource) {
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(instanceId);
        if (instanceConfig == null) {
            throw new NotFoundException("Failed to find instance config for instance: " + instanceId);
        }
        List newTags = newInstance.getTags();
        List oldTags = instanceConfig.getTags();
        InstanceUtils.updateHelixInstanceConfig((InstanceConfig)instanceConfig, (Instance)newInstance);
        if (!this._helixDataAccessor.setProperty(this._keyBuilder.instanceConfig(instanceId), (HelixProperty)instanceConfig)) {
            throw new RuntimeException("Failed to set instance config for instance: " + instanceId);
        }
        boolean shouldUpdateBrokerResource = false;
        List newBrokerTags = null;
        if (instanceId.startsWith("Broker_") && updateBrokerResource) {
            newBrokerTags = newTags != null ? newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()) : Collections.emptyList();
            List oldBrokerTags = oldTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList());
            boolean bl = shouldUpdateBrokerResource = !newBrokerTags.equals(oldBrokerTags);
        }
        if (shouldUpdateBrokerResource) {
            long startTimeMs = System.currentTimeMillis();
            ArrayList tablesAdded = new ArrayList();
            ArrayList tablesRemoved = new ArrayList();
            HelixHelper.updateBrokerResource((HelixManager)this._helixZkManager, (String)instanceId, newBrokerTags, tablesAdded, tablesRemoved);
            LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", new Object[]{instanceId, newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved});
            return PinotResourceManagerResponse.success(String.format("Updated instance: %s, and updated broker resource - tables added: %s, tables removed: %s", instanceId, tablesAdded, tablesRemoved));
        }
        return PinotResourceManagerResponse.success("Updated instance: " + instanceId);
    }

    public synchronized PinotResourceManagerResponse updateInstanceTags(String instanceId, String tagsString, boolean updateBrokerResource) {
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(instanceId);
        if (instanceConfig == null) {
            throw new NotFoundException("Failed to find instance config for instance: " + instanceId);
        }
        List<String> newTags = Arrays.asList(StringUtils.split((String)tagsString, (char)','));
        List oldTags = instanceConfig.getTags();
        instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), newTags);
        if (!this._helixDataAccessor.setProperty(this._keyBuilder.instanceConfig(instanceId), (HelixProperty)instanceConfig)) {
            throw new RuntimeException("Failed to set instance config for instance: " + instanceId);
        }
        boolean shouldUpdateBrokerResource = false;
        List newBrokerTags = null;
        if (instanceId.startsWith("Broker_") && updateBrokerResource) {
            List oldBrokerTags;
            newBrokerTags = newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList());
            boolean bl = shouldUpdateBrokerResource = !newBrokerTags.equals(oldBrokerTags = oldTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()));
        }
        if (shouldUpdateBrokerResource) {
            long startTimeMs = System.currentTimeMillis();
            ArrayList tablesAdded = new ArrayList();
            ArrayList tablesRemoved = new ArrayList();
            HelixHelper.updateBrokerResource((HelixManager)this._helixZkManager, (String)instanceId, newBrokerTags, tablesAdded, tablesRemoved);
            LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", new Object[]{instanceId, newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved});
            return PinotResourceManagerResponse.success(String.format("Updated tags: %s for instance: %s, and updated broker resource - tables added: %s, tables removed: %s", newTags, instanceId, tablesAdded, tablesRemoved));
        }
        return PinotResourceManagerResponse.success(String.format("Updated tags: %s for instance: %s", newTags, instanceId));
    }

    public PinotResourceManagerResponse updateBrokerResource(String instanceId) {
        if (!instanceId.startsWith("Broker_")) {
            throw new BadRequestException("Cannot update broker resource for non-broker instance: " + instanceId);
        }
        InstanceConfig instanceConfig = this.getHelixInstanceConfig(instanceId);
        if (instanceConfig == null) {
            throw new NotFoundException("Failed to find instance config for instance: " + instanceId);
        }
        long startTimeMs = System.currentTimeMillis();
        List brokerTags = instanceConfig.getTags().stream().filter(TagNameUtils::isBrokerTag).collect(Collectors.toList());
        ArrayList tablesAdded = new ArrayList();
        ArrayList tablesRemoved = new ArrayList();
        HelixHelper.updateBrokerResource((HelixManager)this._helixZkManager, (String)instanceId, brokerTags, tablesAdded, tablesRemoved);
        LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", new Object[]{instanceId, brokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved});
        return PinotResourceManagerResponse.success(String.format("Updated broker resource for broker: %s - tables added: %s, tables removed: %s", instanceId, tablesAdded, tablesRemoved));
    }

    public boolean isInstanceOfflineFor(String instanceId, long offlineTimeRangeMs) {
        if (this._helixDataAccessor.getProperty(this._keyBuilder.liveInstance(instanceId)) != null) {
            return false;
        }
        ParticipantHistory participantHistory = (ParticipantHistory)this._helixDataAccessor.getProperty(this._keyBuilder.participantHistory(instanceId));
        if (participantHistory == null) {
            return false;
        }
        long lastOfflineTime = participantHistory.getLastOfflineTime();
        if (lastOfflineTime < 0L) {
            return false;
        }
        if (System.currentTimeMillis() - lastOfflineTime > offlineTimeRangeMs) {
            LOGGER.info("Instance: {} has been offline for more than {}ms", (Object)instanceId, (Object)offlineTimeRangeMs);
            return true;
        }
        return false;
    }

    public List<String> getAllResources() {
        return this._helixAdmin.getResourcesInCluster(this._helixClusterName);
    }

    public List<String> getAllTables() {
        ArrayList<String> tableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            tableNames.add(resourceName);
        }
        return tableNames;
    }

    public List<String> getAllOfflineTables() {
        ArrayList<String> offlineTableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isOfflineTableResource((String)resourceName)) continue;
            offlineTableNames.add(resourceName);
        }
        return offlineTableNames;
    }

    public List<String> getAllRealtimeTables() {
        ArrayList<String> realtimeTableNames = new ArrayList<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isRealtimeTableResource((String)resourceName)) continue;
            realtimeTableNames.add(resourceName);
        }
        return realtimeTableNames;
    }

    public List<String> getAllRawTables() {
        HashSet<String> rawTableNames = new HashSet<String>();
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            rawTableNames.add(TableNameBuilder.extractRawTableName((String)resourceName));
        }
        return new ArrayList<String>(rawTableNames);
    }

    public String getActualTableName(String tableName) {
        if (this._tableCache.isCaseInsensitive()) {
            String actualTableName = this._tableCache.getActualTableName(tableName);
            return actualTableName != null ? actualTableName : tableName;
        }
        return tableName;
    }

    public String getCrypterClassNameFromTableConfig(String tableNameWithType) {
        TableConfig tableConfig = this._tableCache.getTableConfig(tableNameWithType);
        Preconditions.checkNotNull((Object)tableConfig, (String)"Table config is not available for table '%s'", (Object)tableNameWithType);
        return tableConfig.getValidationConfig().getCrypterClassName();
    }

    public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments) {
        List segmentsFromPropertiesStore = ZKMetadataProvider.getSegments(this._propertyStore, (String)tableNameWithType);
        if (shouldExcludeReplacedSegments) {
            return this.excludeReplacedSegments(tableNameWithType, segmentsFromPropertiesStore);
        }
        return segmentsFromPropertiesStore;
    }

    public List<String> getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp, long endTimestamp, boolean excludeOverlapping) {
        ArrayList<String> selectedSegments;
        if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
            selectedSegments = this.getSegmentsFor(tableNameWithType, false);
        } else {
            selectedSegments = new ArrayList();
            List<SegmentZKMetadata> segmentZKMetadataList = this.getSegmentsZKMetadata(tableNameWithType);
            for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
                String segmentName = segmentZKMetadata.getSegmentName();
                if (!this.isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) continue;
                selectedSegments.add(segmentName);
            }
        }
        return this.excludeReplacedSegments(tableNameWithType, selectedSegments);
    }

    private List<String> excludeReplacedSegments(String tableNameWithType, List<String> segments) {
        SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(this._propertyStore, (String)tableNameWithType);
        if (segmentLineage == null) {
            return segments;
        }
        HashSet<String> selectedSegmentSet = new HashSet<String>(segments);
        SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, (SegmentLineage)segmentLineage);
        return new ArrayList<String>(selectedSegmentSet);
    }

    private boolean isSegmentWithinTimeStamps(SegmentZKMetadata segmentMetadata, long startTimestamp, long endTimestamp, boolean excludeOverlapping) {
        if (segmentMetadata == null) {
            return false;
        }
        long startTimeMsInSegment = segmentMetadata.getStartTimeMs();
        long endTimeMsInSegment = segmentMetadata.getEndTimeMs();
        if (startTimeMsInSegment == -1L && endTimeMsInSegment == -1L) {
            return true;
        }
        if (startTimeMsInSegment > endTimeMsInSegment) {
            LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}. End time: {}", new Object[]{segmentMetadata.getSegmentName(), startTimeMsInSegment, endTimeMsInSegment});
            return false;
        }
        if (startTimestamp <= startTimeMsInSegment && endTimeMsInSegment < endTimestamp) {
            return true;
        }
        if (endTimeMsInSegment < startTimestamp || startTimeMsInSegment >= endTimestamp) {
            return false;
        }
        return !excludeOverlapping;
    }

    @Nullable
    public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String segmentName) {
        return ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (String)segmentName);
    }

    public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
        return ZKMetadataProvider.getSegmentsZKMetadata(this._propertyStore, (String)tableNameWithType);
    }

    public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
        return this.deleteSegments(tableNameWithType, segmentNames, null);
    }

    public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames, @Nullable String retentionPeriod) {
        try {
            LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, (Object)tableNameWithType);
            Preconditions.checkArgument((boolean)TableNameBuilder.isTableResource((String)tableNameWithType), (String)"Table name: %s is not a valid table name with type suffix", (Object)tableNameWithType);
            HelixHelper.removeSegmentsFromIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType, segmentNames);
            if (retentionPeriod != null) {
                this._segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, TimeUtils.convertPeriodToMillis((String)retentionPeriod));
            } else {
                TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
                this._segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig);
            }
            return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted");
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while deleting segment: {} from table: {}", new Object[]{segmentNames, tableNameWithType, e});
            return PinotResourceManagerResponse.failure(e.getMessage());
        }
    }

    public synchronized PinotResourceManagerResponse deleteSegment(String tableNameWithType, String segmentName) {
        return this.deleteSegments(tableNameWithType, Collections.singletonList(segmentName));
    }

    public PinotResourceManagerResponse updateBrokerTenant(Tenant tenant) {
        String brokerTenantTag = TagNameUtils.getBrokerTagForTenant((String)tenant.getTenantName());
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTenantTag);
        if (instancesInClusterWithTag.size() > tenant.getNumberOfInstances()) {
            return this.scaleDownBroker(tenant, brokerTenantTag, instancesInClusterWithTag);
        }
        if (instancesInClusterWithTag.size() < tenant.getNumberOfInstances()) {
            return this.scaleUpBroker(tenant, brokerTenantTag, instancesInClusterWithTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse scaleUpBroker(Tenant tenant, String brokerTenantTag, List<String> instancesInClusterWithTag) {
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedBrokerInstanceList();
        int numberOfInstancesToAdd = tenant.getNumberOfInstances() - instancesInClusterWithTag.size();
        if (unTaggedInstanceList.size() < numberOfInstancesToAdd) {
            String message = "Failed to allocate broker instances to Tag : " + tenant.getTenantName() + ", Current number of untagged broker instances : " + unTaggedInstanceList.size() + ", Current number of tagged broker instances : " + instancesInClusterWithTag.size() + ", Request asked number is : " + tenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        for (int i = 0; i < numberOfInstancesToAdd; ++i) {
            String instanceName = unTaggedInstanceList.get(i);
            this.retagInstance(instanceName, "broker_untagged", brokerTenantTag);
            this.addInstanceToBrokerIdealState(brokerTenantTag, instanceName);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType) throws Exception {
        TableConfig tableConfig;
        try {
            tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        }
        catch (Exception e) {
            LOGGER.warn("Caught exception while getting table config for table {}", (Object)tableNameWithType, (Object)e);
            throw new InvalidTableConfigException("Failed to fetch broker tag for table " + tableNameWithType + " due to exception: " + e.getMessage());
        }
        if (tableConfig == null) {
            LOGGER.warn("Table " + tableNameWithType + " does not exist");
            throw new InvalidConfigException("Invalid table configuration for table " + tableNameWithType + ". Table does not exist");
        }
        return this.rebuildBrokerResource(tableNameWithType, this.getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker()));
    }

    public PinotResourceManagerResponse rebuildBrokerResource(String tableNameWithType, Set<String> brokerInstances) {
        IdealState brokerIdealState = HelixHelper.getBrokerIdealStates((HelixAdmin)this._helixAdmin, (String)this._helixClusterName);
        Set brokerInstancesInIdealState = brokerIdealState.getInstanceSet(tableNameWithType);
        if (brokerInstancesInIdealState.equals(brokerInstances)) {
            return PinotResourceManagerResponse.success("Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType);
        }
        try {
            HelixHelper.updateIdealState((HelixManager)this.getHelixZkManager(), (String)"brokerResource", idealState -> {
                assert (idealState != null);
                Map instanceStateMap = idealState.getInstanceStateMap(tableNameWithType);
                if (instanceStateMap != null) {
                    instanceStateMap.clear();
                }
                for (String brokerInstance : brokerInstances) {
                    idealState.setPartitionState(tableNameWithType, brokerInstance, "ONLINE");
                }
                return idealState;
            }, (RetryPolicy)DEFAULT_RETRY_POLICY);
            LOGGER.info("Successfully rebuilt brokerResource for table: {}", (Object)tableNameWithType);
            return PinotResourceManagerResponse.success("Rebuilt brokerResource for table: " + tableNameWithType);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while rebuilding broker resource for table: {}", (Object)tableNameWithType, (Object)e);
            throw e;
        }
    }

    private void addInstanceToBrokerIdealState(String brokerTenantTag, String instanceName) {
        IdealState tableIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, "brokerResource");
        for (String tableNameWithType : tableIdealState.getPartitionSet()) {
            TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
            Preconditions.checkNotNull((Object)tableConfig);
            String brokerTag = TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig());
            if (!brokerTag.equals(brokerTenantTag)) continue;
            tableIdealState.setPartitionState(tableNameWithType, instanceName, "ONLINE");
        }
        this._helixAdmin.setResourceIdealState(this._helixClusterName, "brokerResource", tableIdealState);
    }

    private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String brokerTenantTag, List<String> instancesInClusterWithTag) {
        int numberBrokersToUntag = instancesInClusterWithTag.size() - tenant.getNumberOfInstances();
        for (int i = 0; i < numberBrokersToUntag; ++i) {
            this.retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag, "broker_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private void retagInstance(String instanceName, String oldTag, String newTag) {
        this._helixAdmin.removeInstanceTag(this._helixClusterName, instanceName, oldTag);
        this._helixAdmin.addInstanceTag(this._helixClusterName, instanceName, newTag);
    }

    public PinotResourceManagerResponse updateServerTenant(Tenant serverTenant) {
        boolean isCurrentTenantColocated;
        String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant((String)serverTenant.getTenantName());
        List taggedRealtimeServers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)realtimeServerTag);
        String offlineServerTag = TagNameUtils.getOfflineTagForTenant((String)serverTenant.getTenantName());
        List taggedOfflineServers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)offlineServerTag);
        HashSet<String> allServingServers = new HashSet<String>();
        allServingServers.addAll(taggedOfflineServers);
        allServingServers.addAll(taggedRealtimeServers);
        boolean bl = isCurrentTenantColocated = allServingServers.size() < taggedOfflineServers.size() + taggedRealtimeServers.size();
        if (isCurrentTenantColocated != serverTenant.isCoLocated()) {
            String message = "Not support different colocated type request for update request: " + serverTenant;
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        if (serverTenant.getNumberOfInstances() < allServingServers.size() || serverTenant.getOfflineInstances() < taggedOfflineServers.size() || serverTenant.getRealtimeInstances() < taggedRealtimeServers.size()) {
            return this.scaleDownServer(serverTenant, taggedRealtimeServers, taggedOfflineServers, allServingServers);
        }
        return this.scaleUpServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, allServingServers);
    }

    private PinotResourceManagerResponse scaleUpServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, Set<String> allServingServers) {
        int incInstances = serverTenant.getNumberOfInstances() - allServingServers.size();
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedServerInstanceList();
        if (unTaggedInstanceList.size() < incInstances) {
            String message = "Failed to allocate hardware resources with tenant info: " + serverTenant + ", Current number of untagged instances : " + unTaggedInstanceList.size() + ", Current number of serving instances : " + allServingServers.size() + ", Current number of tagged offline server instances : " + taggedOfflineServers.size() + ", Current number of tagged realtime server instances : " + taggedRealtimeServers.size();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        if (serverTenant.isCoLocated()) {
            return this.updateColocatedServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, incInstances, unTaggedInstanceList);
        }
        return this.updateIndependentServerTenant(serverTenant, realtimeServerTag, taggedRealtimeServers, offlineServerTag, taggedOfflineServers, incInstances, unTaggedInstanceList);
    }

    private PinotResourceManagerResponse updateIndependentServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, int incInstances, List<String> unTaggedInstanceList) {
        int i;
        int incOffline = serverTenant.getOfflineInstances() - taggedOfflineServers.size();
        int incRealtime = serverTenant.getRealtimeInstances() - taggedRealtimeServers.size();
        for (i = 0; i < incOffline; ++i) {
            this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", offlineServerTag);
        }
        for (i = incOffline; i < incOffline + incRealtime; ++i) {
            String instanceName = unTaggedInstanceList.get(i);
            this.retagInstance(instanceName, "server_untagged", realtimeServerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse updateColocatedServerTenant(Tenant serverTenant, String realtimeServerTag, List<String> taggedRealtimeServers, String offlineServerTag, List<String> taggedOfflineServers, int incInstances, List<String> unTaggedInstanceList) {
        int i;
        int incOffline = serverTenant.getOfflineInstances() - taggedOfflineServers.size();
        int incRealtime = serverTenant.getRealtimeInstances() - taggedRealtimeServers.size();
        taggedRealtimeServers.removeAll(taggedOfflineServers);
        taggedOfflineServers.removeAll(taggedRealtimeServers);
        for (i = 0; i < incOffline; ++i) {
            if (i < incInstances) {
                this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", offlineServerTag);
                continue;
            }
            this._helixAdmin.addInstanceTag(this._helixClusterName, taggedRealtimeServers.get(i - incInstances), offlineServerTag);
        }
        for (i = incOffline; i < incOffline + incRealtime; ++i) {
            if (i < incInstances) {
                this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", realtimeServerTag);
                continue;
            }
            this._helixAdmin.addInstanceTag(this._helixClusterName, taggedOfflineServers.get(i - Math.max(incInstances, incOffline)), realtimeServerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private PinotResourceManagerResponse scaleDownServer(Tenant serverTenant, List<String> taggedRealtimeServers, List<String> taggedOfflineServers, Set<String> allServingServers) {
        String message = "Not support to size down the current server cluster with tenant info: " + serverTenant + ", Current number of serving instances : " + allServingServers.size() + ", Current number of tagged offline server instances : " + taggedOfflineServers.size() + ", Current number of tagged realtime server instances : " + taggedRealtimeServers.size();
        LOGGER.error(message);
        return PinotResourceManagerResponse.failure(message);
    }

    public boolean isBrokerTenantDeletable(String tenantName) {
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)tenantName);
        HashSet taggedInstances = new HashSet(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTag));
        String brokerName = "brokerResource";
        IdealState brokerIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, brokerName);
        for (String partition : brokerIdealState.getPartitionSet()) {
            for (String instance : brokerIdealState.getInstanceSet(partition)) {
                if (!taggedInstances.contains(instance)) continue;
                return false;
            }
        }
        return true;
    }

    public boolean isServerTenantDeletable(String tenantName) {
        HashSet taggedInstances = new HashSet(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.getOfflineTagForTenant((String)tenantName)));
        taggedInstances.addAll(HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.getRealtimeTagForTenant((String)tenantName)));
        for (String resourceName : this.getAllResources()) {
            if (!TableNameBuilder.isTableResource((String)resourceName)) continue;
            IdealState tableIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, resourceName);
            for (String partition : tableIdealState.getPartitionSet()) {
                for (String instance : tableIdealState.getInstanceSet(partition)) {
                    if (!taggedInstances.contains(instance)) continue;
                    return false;
                }
            }
        }
        return true;
    }

    public Set<String> getAllBrokerTenantNames() {
        HashSet<String> tenantSet = new HashSet<String>();
        List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
        for (InstanceConfig instanceConfig : instanceConfigs) {
            for (String tag : instanceConfig.getTags()) {
                if (!TagNameUtils.isBrokerTag((String)tag)) continue;
                tenantSet.add(TagNameUtils.getTenantFromTag((String)tag));
            }
        }
        return tenantSet;
    }

    public Set<String> getAllServerTenantNames() {
        HashSet<String> tenantSet = new HashSet<String>();
        List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
        for (InstanceConfig instanceConfig : instanceConfigs) {
            for (String tag : instanceConfig.getTags()) {
                if (!TagNameUtils.isServerTag((String)tag)) continue;
                tenantSet.add(TagNameUtils.getTenantFromTag((String)tag));
            }
        }
        return tenantSet;
    }

    private List<String> getTagsForInstance(String instanceName) {
        InstanceConfig config = (InstanceConfig)this._helixDataAccessor.getProperty(this._keyBuilder.instanceConfig(instanceName));
        return config.getTags();
    }

    public PinotResourceManagerResponse createServerTenant(Tenant serverTenant) {
        int numberOfInstances = serverTenant.getNumberOfInstances();
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedServerInstanceList();
        if (unTaggedInstanceList.size() < numberOfInstances) {
            String message = "Failed to allocate server instances to Tag : " + serverTenant.getTenantName() + ", Current number of untagged server instances : " + unTaggedInstanceList.size() + ", Request asked number is : " + serverTenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        if (serverTenant.isCoLocated()) {
            this.assignColocatedServerTenant(serverTenant, numberOfInstances, unTaggedInstanceList);
        } else {
            this.assignIndependentServerTenant(serverTenant, numberOfInstances, unTaggedInstanceList);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    private void assignIndependentServerTenant(Tenant serverTenant, int numberOfInstances, List<String> unTaggedInstanceList) {
        String offlineServerTag = TagNameUtils.getOfflineTagForTenant((String)serverTenant.getTenantName());
        for (int i = 0; i < serverTenant.getOfflineInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(i), "server_untagged", offlineServerTag);
        }
        String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant((String)serverTenant.getTenantName());
        for (int i = 0; i < serverTenant.getRealtimeInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(i + serverTenant.getOfflineInstances()), "server_untagged", realtimeServerTag);
        }
    }

    private void assignColocatedServerTenant(Tenant serverTenant, int numberOfInstances, List<String> unTaggedInstanceList) {
        int cnt = 0;
        String offlineServerTag = TagNameUtils.getOfflineTagForTenant((String)serverTenant.getTenantName());
        for (int i = 0; i < serverTenant.getOfflineInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(cnt++), "server_untagged", offlineServerTag);
        }
        String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant((String)serverTenant.getTenantName());
        for (int i = 0; i < serverTenant.getRealtimeInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(cnt++), "server_untagged", realtimeServerTag);
            if (cnt != numberOfInstances) continue;
            cnt = 0;
        }
    }

    public PinotResourceManagerResponse createBrokerTenant(Tenant brokerTenant) {
        List<String> unTaggedInstanceList = this.getOnlineUnTaggedBrokerInstanceList();
        int numberOfInstances = brokerTenant.getNumberOfInstances();
        if (unTaggedInstanceList.size() < numberOfInstances) {
            String message = "Failed to allocate broker instances to Tag : " + brokerTenant.getTenantName() + ", Current number of untagged server instances : " + unTaggedInstanceList.size() + ", Request asked number is : " + brokerTenant.getNumberOfInstances();
            LOGGER.error(message);
            return PinotResourceManagerResponse.failure(message);
        }
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)brokerTenant.getTenantName());
        for (int i = 0; i < brokerTenant.getNumberOfInstances(); ++i) {
            this.retagInstance(unTaggedInstanceList.get(i), "broker_untagged", brokerTag);
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteOfflineServerTenantFor(String tenantName) {
        String offlineTenantTag = TagNameUtils.getOfflineTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)offlineTenantTag);
        for (String instanceName : instancesInClusterWithTag) {
            this._helixAdmin.removeInstanceTag(this._helixClusterName, instanceName, offlineTenantTag);
            if (!this.getTagsForInstance(instanceName).isEmpty()) continue;
            this._helixAdmin.addInstanceTag(this._helixClusterName, instanceName, "server_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteRealtimeServerTenantFor(String tenantName) {
        String realtimeTenantTag = TagNameUtils.getRealtimeTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)realtimeTenantTag);
        for (String instanceName : instancesInClusterWithTag) {
            this._helixAdmin.removeInstanceTag(this._helixClusterName, instanceName, realtimeTenantTag);
            if (!this.getTagsForInstance(instanceName).isEmpty()) continue;
            this._helixAdmin.addInstanceTag(this._helixClusterName, instanceName, "server_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public PinotResourceManagerResponse deleteBrokerTenantFor(String tenantName) {
        String brokerTag = TagNameUtils.getBrokerTagForTenant((String)tenantName);
        List instancesInClusterWithTag = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)brokerTag);
        for (String instance : instancesInClusterWithTag) {
            this.retagInstance(instance, brokerTag, "broker_untagged");
        }
        return PinotResourceManagerResponse.SUCCESS;
    }

    public Set<String> getAllInstancesForServerTenant(List<InstanceConfig> instanceConfigs, String tenantName) {
        return HelixHelper.getServerInstancesForTenant(instanceConfigs, (String)tenantName);
    }

    public Set<String> getAllInstancesForServerTenant(String tenantName) {
        return this.getAllInstancesForServerTenant(HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), tenantName);
    }

    public Set<String> getAllInstancesForServerTenantWithType(List<InstanceConfig> instanceConfigs, String tenantName, TableType tableType) {
        return HelixHelper.getServerInstancesForTenantWithType(instanceConfigs, (String)tenantName, (TableType)tableType);
    }

    public Set<String> getAllInstancesForBrokerTenant(List<InstanceConfig> instanceConfigs, String tenantName) {
        return HelixHelper.getBrokerInstancesForTenant(instanceConfigs, (String)tenantName);
    }

    public Set<String> getAllInstancesForBrokerTenant(String tenantName) {
        return this.getAllInstancesForBrokerTenant(HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), tenantName);
    }

    public Set<InstanceConfig> getAllInstancesConfigsForBrokerTenant(String tenantName) {
        return HelixHelper.getBrokerInstanceConfigsForTenant((List)HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager), (String)tenantName);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void addSchema(Schema schema, boolean override) throws SchemaAlreadyExistsException, SchemaBackwardIncompatibleException {
        String schemaName = schema.getSchemaName();
        LOGGER.info("Adding schema: {} with override: {}", (Object)schemaName, (Object)override);
        Schema oldSchema = ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
        if (oldSchema != null) {
            if (!override) throw new SchemaAlreadyExistsException(String.format("Schema: %s already exists", schemaName));
            this.updateSchema(schema, oldSchema);
            return;
        } else {
            ZKMetadataProvider.setSchema(this._propertyStore, (Schema)schema);
            LOGGER.info("Added schema: {}", (Object)schemaName);
        }
    }

    public void updateSchema(Schema schema, boolean reload) throws SchemaNotFoundException, SchemaBackwardIncompatibleException, TableNotFoundException {
        String schemaName = schema.getSchemaName();
        LOGGER.info("Updating schema: {} with reload: {}", (Object)schemaName, (Object)reload);
        Schema oldSchema = ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
        if (oldSchema == null) {
            throw new SchemaNotFoundException(String.format("Schema: %s does not exist", schemaName));
        }
        this.updateSchema(schema, oldSchema);
        if (reload) {
            LOGGER.info("Reloading tables with name: {}", (Object)schemaName);
            List<String> tableNamesWithType = this.getExistingTableNamesWithType(schemaName, null);
            for (String tableNameWithType : tableNamesWithType) {
                this.reloadAllSegments(tableNameWithType, false);
            }
        }
    }

    private void updateSchema(Schema schema, Schema oldSchema) throws SchemaBackwardIncompatibleException {
        String schemaName = schema.getSchemaName();
        schema.updateBooleanFieldsIfNeeded(oldSchema);
        if (schema.equals((Object)oldSchema)) {
            LOGGER.info("New schema: {} is the same as the existing schema, not updating it", (Object)schemaName);
            return;
        }
        if (!schema.isBackwardCompatibleWith(oldSchema)) {
            throw new SchemaBackwardIncompatibleException(String.format("New schema: %s is not backward-compatible with the existing schema", schemaName));
        }
        ZKMetadataProvider.setSchema(this._propertyStore, (Schema)schema);
        LOGGER.info("Updated schema: {}", (Object)schemaName);
    }

    public boolean deleteSchema(Schema schema) {
        if (schema != null) {
            String schemaName = schema.getSchemaName();
            LOGGER.info("Deleting schema: {}", (Object)schemaName);
            String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForSchema((String)schemaName);
            if (this._propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
                this._propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
                LOGGER.info("Deleted schema: {}", (Object)schemaName);
                return true;
            }
        }
        return false;
    }

    @Nullable
    public Schema getSchema(String schemaName) {
        return ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName);
    }

    @Nullable
    public Schema getTableSchema(String tableName) {
        return ZKMetadataProvider.getTableSchema(this._propertyStore, (String)tableName);
    }

    @Nullable
    public Schema getSchemaForTableConfig(TableConfig tableConfig) {
        String schemaName;
        Schema schema = this.getSchema(TableNameBuilder.extractRawTableName((String)tableConfig.getTableName()));
        if (schema == null && (schemaName = tableConfig.getValidationConfig().getSchemaName()) != null) {
            schema = this.getSchema(schemaName);
        }
        return schema;
    }

    public List<String> getSchemaNames() {
        return this._propertyStore.getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(this._propertyStore).getRelativePath(), AccessOption.PERSISTENT);
    }

    public void addTable(TableConfig tableConfig) throws IOException {
        String tableNameWithType = tableConfig.getTableName();
        if (this.getTableConfig(tableNameWithType) != null) {
            throw new TableAlreadyExistsException("Table " + tableNameWithType + " already exists");
        }
        this.validateTableTenantConfig(tableConfig);
        SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig();
        TableType tableType = tableConfig.getTableType();
        switch (tableType) {
            case OFFLINE: {
                LOGGER.info("building empty ideal state for table : " + tableNameWithType);
                IdealState offlineIdealState = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, Integer.parseInt(segmentsConfig.getReplication()), this._enableBatchMessageMode);
                LOGGER.info("adding table via the admin");
                this._helixAdmin.addResource(this._helixClusterName, tableNameWithType, offlineIdealState);
                ZKMetadataProvider.setOfflineTableConfig(this._propertyStore, (String)tableNameWithType, (ZNRecord)TableConfigUtils.toZNRecord((TableConfig)tableConfig));
                this.assignInstances(tableConfig, true);
                LOGGER.info("Successfully added table: {}", (Object)tableNameWithType);
                break;
            }
            case REALTIME: {
                String schemaName;
                this.verifyStreamConfig(tableNameWithType, tableConfig);
                Schema schema = ZKMetadataProvider.getSchema(this._propertyStore, (String)TableNameBuilder.extractRawTableName((String)tableNameWithType));
                if (schema == null && ((schemaName = tableConfig.getValidationConfig().getSchemaName()) == null || ZKMetadataProvider.getSchema(this._propertyStore, (String)schemaName) == null)) {
                    throw new InvalidTableConfigException("No schema defined for realtime table: " + tableNameWithType);
                }
                ZKMetadataProvider.setRealtimeTableConfig(this._propertyStore, (String)tableNameWithType, (ZNRecord)TableConfigUtils.toZNRecord((TableConfig)tableConfig));
                this.assignInstances(tableConfig, true);
                this.ensureRealtimeClusterIsSetUp(tableConfig);
                LOGGER.info("Successfully added or updated the table {} ", (Object)tableNameWithType);
                break;
            }
            default: {
                throw new InvalidTableConfigException("Unsupported table type: " + tableType);
            }
        }
        LOGGER.info("Updating BrokerResource IdealState for table: {}", (Object)tableNameWithType);
        List brokers = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig()));
        HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)"brokerResource", idealState -> {
            assert (idealState != null);
            idealState.getRecord().getMapFields().put(tableNameWithType, SegmentAssignmentUtils.getInstanceStateMap(brokers, "ONLINE"));
            return idealState;
        });
    }

    @VisibleForTesting
    void validateTableTenantConfig(TableConfig tableConfig) {
        TenantConfig tenantConfig = tableConfig.getTenantConfig();
        String tableNameWithType = tableConfig.getTableName();
        String brokerTag = tenantConfig.getBroker();
        String serverTag = tenantConfig.getServer();
        if (brokerTag == null || serverTag == null) {
            if (!this._isSingleTenantCluster) {
                throw new InvalidTableConfigException("server and broker tenants must be specified for multi-tenant cluster for table: " + tableNameWithType);
            }
            String newBrokerTag = brokerTag == null ? "DefaultTenant" : brokerTag;
            String newServerTag = serverTag == null ? "DefaultTenant" : serverTag;
            tableConfig.setTenantConfig(new TenantConfig(newBrokerTag, newServerTag, tenantConfig.getTagOverrideConfig()));
        }
        TreeSet<String> tagsToCheck = new TreeSet<String>();
        tagsToCheck.add(TagNameUtils.extractBrokerTag((TenantConfig)tenantConfig));
        if (tableConfig.isDimTable()) {
            String offlineTag = TagNameUtils.extractOfflineServerTag((TenantConfig)tenantConfig);
            String realtimeTag = TagNameUtils.extractRealtimeServerTag((TenantConfig)tenantConfig);
            if (this.getInstancesWithTag(offlineTag).isEmpty() && this.getInstancesWithTag(realtimeTag).isEmpty()) {
                throw new InvalidTableConfigException("Failed to find instances for dimension table: " + tableNameWithType);
            }
        } else if (tableConfig.getTableType() == TableType.OFFLINE) {
            tagsToCheck.add(TagNameUtils.extractOfflineServerTag((TenantConfig)tenantConfig));
        } else {
            String consumingServerTag = TagNameUtils.extractConsumingServerTag((TenantConfig)tenantConfig);
            if (!TagNameUtils.isServerTag((String)consumingServerTag)) {
                throw new InvalidTableConfigException("Invalid CONSUMING server tag: " + consumingServerTag + " for table: " + tableNameWithType);
            }
            tagsToCheck.add(consumingServerTag);
            String completedServerTag = TagNameUtils.extractCompletedServerTag((TenantConfig)tenantConfig);
            if (!TagNameUtils.isServerTag((String)completedServerTag)) {
                throw new InvalidTableConfigException("Invalid COMPLETED server tag: " + completedServerTag + " for table: " + tableNameWithType);
            }
            tagsToCheck.add(completedServerTag);
        }
        for (String tag : tagsToCheck) {
            if (!this.getInstancesWithTag(tag).isEmpty()) continue;
            throw new InvalidTableConfigException("Failed to find instances with tag: " + tag + " for table: " + tableNameWithType);
        }
    }

    public boolean setZKData(String path, ZNRecord record, int expectedVersion, int accessOption) {
        return this._helixDataAccessor.getBaseDataAccessor().set(path, (Object)record, expectedVersion, accessOption);
    }

    public boolean deleteZKPath(String path) {
        return this._helixDataAccessor.getBaseDataAccessor().remove(path, -1);
    }

    public ZNRecord readZKData(String path) {
        return (ZNRecord)this._helixDataAccessor.getBaseDataAccessor().get(path, null, -1);
    }

    public List<String> getZKChildren(String path) {
        return this._helixDataAccessor.getBaseDataAccessor().getChildNames(path, -1);
    }

    public Map<String, Stat> getZKChildrenStats(String path) {
        List childNames = this._helixDataAccessor.getBaseDataAccessor().getChildNames(path, -1);
        List childPaths = childNames.stream().map(name -> (path + "/" + name).replaceAll("//", "/")).collect(Collectors.toList());
        Stat[] stats = this._helixDataAccessor.getBaseDataAccessor().getStats(childPaths, -1);
        LinkedHashMap<String, Stat> statsMap = new LinkedHashMap<String, Stat>(childNames.size());
        for (int i = 0; i < childNames.size(); ++i) {
            statsMap.put((String)childNames.get(i), stats[i]);
        }
        return statsMap;
    }

    public Stat getZKStat(String path) {
        return this._helixDataAccessor.getBaseDataAccessor().getStat(path, -1);
    }

    public void registerPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
        this._pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
    }

    private void verifyStreamConfig(String tableNameWithType, TableConfig tableConfig) {
        StreamConfig streamConfig = new StreamConfig(tableNameWithType, IngestionConfigUtils.getStreamConfigMap((TableConfig)tableConfig));
        if (streamConfig.hasHighLevelConsumerType() && !this._allowHLCTables) {
            throw new InvalidTableConfigException("Creating HLC realtime table is not allowed for Table: " + tableNameWithType);
        }
    }

    private void ensureRealtimeClusterIsSetUp(TableConfig rawRealtimeTableConfig) {
        TableConfig realtimeTableConfig = (TableConfig)ConfigUtils.applyConfigWithEnvVariables((BaseJsonConfig)rawRealtimeTableConfig);
        String realtimeTableName = realtimeTableConfig.getTableName();
        StreamConfig streamConfig = new StreamConfig(realtimeTableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap((TableConfig)realtimeTableConfig));
        IdealState idealState = this.getTableIdealState(realtimeTableName);
        if (streamConfig.hasHighLevelConsumerType()) {
            if (idealState == null) {
                LOGGER.info("Initializing IdealState for HLC table: {}", (Object)realtimeTableName);
                idealState = PinotTableIdealStateBuilder.buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, this._helixZkManager, this._propertyStore, this._enableBatchMessageMode);
                this._helixAdmin.addResource(this._helixClusterName, realtimeTableName, idealState);
            } else if (!streamConfig.hasLowLevelConsumerType()) {
                this._pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
            }
            this.ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
        }
        if (streamConfig.hasLowLevelConsumerType()) {
            if (ZKMetadataProvider.getLLCRealtimeSegments(this._propertyStore, (String)realtimeTableName).isEmpty()) {
                PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(this._pinotLLCRealtimeSegmentManager, realtimeTableName, realtimeTableConfig, idealState, this._enableBatchMessageMode);
                LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", (Object)realtimeTableName);
            } else {
                LOGGER.info("LLC is already set up for table {}, not configuring again", (Object)realtimeTableName);
            }
        }
    }

    private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String realtimeTableName) {
        String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForResource((String)realtimeTableName);
        if (!this._propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
            LOGGER.info("Creating property store entry for HLC table: {}", (Object)realtimeTableName);
            this._propertyStore.create(propertyStorePath, (Object)new ZNRecord(realtimeTableName), AccessOption.PERSISTENT);
        }
    }

    private void assignInstances(TableConfig tableConfig, boolean override) {
        String tableNameWithType = tableConfig.getTableName();
        String rawTableName = TableNameBuilder.extractRawTableName((String)tableNameWithType);
        ArrayList<InstancePartitionsType> instancePartitionsTypesToAssign = new ArrayList<InstancePartitionsType>();
        for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) {
            if (!InstanceAssignmentConfigUtils.allowInstanceAssignment((TableConfig)tableConfig, (InstancePartitionsType)instancePartitionsType) || !override && InstancePartitionsUtils.fetchInstancePartitions(this._propertyStore, (String)instancePartitionsType.getInstancePartitionsName(rawTableName)) != null) continue;
            instancePartitionsTypesToAssign.add(instancePartitionsType);
        }
        if (!instancePartitionsTypesToAssign.isEmpty()) {
            LOGGER.info("Assigning {} instances to table: {}", instancePartitionsTypesToAssign, (Object)tableNameWithType);
            InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
            List<InstanceConfig> instanceConfigs = this.getAllHelixInstanceConfigs();
            for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
                InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs);
                LOGGER.info("Persisting instance partitions: {}", (Object)instancePartitions);
                InstancePartitionsUtils.persistInstancePartitions(this._propertyStore, (InstancePartitions)instancePartitions);
            }
        }
    }

    public void updateTableConfig(TableConfig tableConfig) throws IOException {
        this.validateTableTenantConfig(tableConfig);
        this.setExistingTableConfig(tableConfig);
    }

    public void setExistingTableConfig(TableConfig tableConfig) throws IOException {
        String tableNameWithType = tableConfig.getTableName();
        SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig();
        TableType tableType = tableConfig.getTableType();
        switch (tableType) {
            case OFFLINE: {
                ZKMetadataProvider.setOfflineTableConfig(this._propertyStore, (String)tableNameWithType, (ZNRecord)TableConfigUtils.toZNRecord((TableConfig)tableConfig));
                IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
                String replicationConfigured = segmentsConfig.getReplication();
                if (!idealState.getReplicas().equals(replicationConfigured)) {
                    HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType, is -> {
                        assert (is != null);
                        is.setReplicas(replicationConfigured);
                        return is;
                    }, (RetryPolicy)RetryPolicies.exponentialBackoffRetryPolicy((int)5, (long)1000L, (double)1.2f));
                }
                this.assignInstances(tableConfig, false);
                break;
            }
            case REALTIME: {
                this.verifyStreamConfig(tableNameWithType, tableConfig);
                ZKMetadataProvider.setRealtimeTableConfig(this._propertyStore, (String)tableNameWithType, (ZNRecord)TableConfigUtils.toZNRecord((TableConfig)tableConfig));
                this.assignInstances(tableConfig, false);
                this.ensureRealtimeClusterIsSetUp(tableConfig);
                break;
            }
            default: {
                throw new InvalidTableConfigException("Unsupported table type: " + tableType);
            }
        }
        this.sendTableConfigRefreshMessage(tableNameWithType);
    }

    public void updateMetadataConfigFor(String tableName, TableType type, TableCustomConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((TableType)type).tableNameWithType(tableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        tableConfig.setCustomConfig(newConfigs);
        this.setExistingTableConfig(tableConfig);
    }

    public void updateSegmentsValidationAndRetentionConfigFor(String tableName, TableType type, SegmentsValidationAndRetentionConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((TableType)type).tableNameWithType(tableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        tableConfig.setValidationConfig(newConfigs);
        this.setExistingTableConfig(tableConfig);
    }

    public void updateIndexingConfigFor(String tableName, TableType type, IndexingConfig newConfigs) throws Exception {
        String tableNameWithType = TableNameBuilder.forType((TableType)type).tableNameWithType(tableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        if (tableConfig == null) {
            throw new RuntimeException("Table: " + tableName + " of type: " + type + " does not exist");
        }
        tableConfig.setIndexingConfig(newConfigs);
        this.setExistingTableConfig(tableConfig);
    }

    public void deleteOfflineTable(String tableName) {
        String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
        LOGGER.info("Deleting table {}: Start", (Object)offlineTableName);
        HelixHelper.removeResourceFromBrokerIdealState((HelixManager)this._helixZkManager, (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed from broker resource", (Object)offlineTableName);
        if (this._helixAdmin.getResourcesInCluster(this._helixClusterName).contains(offlineTableName)) {
            this._helixAdmin.dropResource(this._helixClusterName, offlineTableName);
            LOGGER.info("Deleting table {}: Removed helix table resource", (Object)offlineTableName);
        }
        this._segmentDeletionManager.removeSegmentsFromStore(offlineTableName, this.getSegmentsFor(offlineTableName, false));
        LOGGER.info("Deleting table {}: Removed stored segments", (Object)offlineTableName);
        ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(this._propertyStore, (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed segment metadata", (Object)offlineTableName);
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed instance partitions", (Object)offlineTableName);
        SegmentLineageAccessHelper.deleteSegmentLineage(this._propertyStore, (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed segment lineage", (Object)offlineTableName);
        MinionTaskMetadataUtils.deleteTaskMetadata(this._propertyStore, (String)"MergeRollupTask", (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed merge rollup task metadata", (Object)offlineTableName);
        ZKMetadataProvider.removeResourceConfigFromPropertyStore(this._propertyStore, (String)offlineTableName);
        LOGGER.info("Deleting table {}: Removed table config", (Object)offlineTableName);
        LOGGER.info("Deleting table {}: Finish", (Object)offlineTableName);
    }

    public void deleteRealtimeTable(String tableName) {
        String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        LOGGER.info("Deleting table {}: Start", (Object)realtimeTableName);
        HelixHelper.removeResourceFromBrokerIdealState((HelixManager)this._helixZkManager, (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed from broker resource", (Object)realtimeTableName);
        Set<String> instancesForTable = null;
        if (this._helixAdmin.getResourcesInCluster(this._helixClusterName).contains(realtimeTableName)) {
            instancesForTable = this.getAllInstancesForTable(realtimeTableName);
            this._helixAdmin.dropResource(this._helixClusterName, realtimeTableName);
            LOGGER.info("Deleting table {}: Removed helix table resource", (Object)realtimeTableName);
        }
        this._segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, this.getSegmentsFor(realtimeTableName, false));
        LOGGER.info("Deleting table {}: Removed stored segments", (Object)realtimeTableName);
        ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(this._propertyStore, (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed segment metadata", (Object)realtimeTableName);
        String rawTableName = TableNameBuilder.extractRawTableName((String)tableName);
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
        InstancePartitionsUtils.removeInstancePartitions(this._propertyStore, (String)InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
        LOGGER.info("Deleting table {}: Removed instance partitions", (Object)realtimeTableName);
        SegmentLineageAccessHelper.deleteSegmentLineage(this._propertyStore, (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed segment lineage", (Object)realtimeTableName);
        MinionTaskMetadataUtils.deleteTaskMetadata(this._propertyStore, (String)"MergeRollupTask", (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed merge rollup task metadata", (Object)realtimeTableName);
        MinionTaskMetadataUtils.deleteTaskMetadata(this._propertyStore, (String)"RealtimeToOfflineSegmentsTask", (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed merge realtime to offline metadata", (Object)realtimeTableName);
        if (instancesForTable != null) {
            for (String instance : instancesForTable) {
                InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, (String)instance);
                if (instanceZKMetadata == null) continue;
                instanceZKMetadata.removeResource(realtimeTableName);
                ZKMetadataProvider.setInstanceZKMetadata(this._propertyStore, (InstanceZKMetadata)instanceZKMetadata);
            }
        }
        LOGGER.info("Deleting table {}: Removed groupId/partitionId mapping for HLC table", (Object)realtimeTableName);
        ZKMetadataProvider.removeResourceConfigFromPropertyStore(this._propertyStore, (String)realtimeTableName);
        LOGGER.info("Deleting table {}: Removed table config", (Object)realtimeTableName);
        LOGGER.info("Deleting table {}: Finish", (Object)realtimeTableName);
    }

    public PinotResourceManagerResponse toggleTableState(String tableNameWithType, StateType stateType) {
        if (!this.hasTable(tableNameWithType)) {
            return PinotResourceManagerResponse.failure("Table: " + tableNameWithType + " not found");
        }
        switch (stateType) {
            case ENABLE: {
                this._helixAdmin.enableResource(this._helixClusterName, tableNameWithType, true);
                boolean resetSuccessful = false;
                try {
                    this._helixAdmin.resetResource(this._helixClusterName, Collections.singletonList(tableNameWithType));
                    resetSuccessful = true;
                }
                catch (HelixException e) {
                    LOGGER.warn("Caught exception while resetting resource: {}", (Object)tableNameWithType, (Object)e);
                }
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " enabled (reset success = " + resetSuccessful + ")");
            }
            case DISABLE: {
                this._helixAdmin.enableResource(this._helixClusterName, tableNameWithType, false);
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " disabled");
            }
            case DROP: {
                TableType tableType = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
                if (tableType == TableType.OFFLINE) {
                    this.deleteOfflineTable(tableNameWithType);
                } else {
                    this.deleteRealtimeTable(tableNameWithType);
                }
                return PinotResourceManagerResponse.success("Table: " + tableNameWithType + " dropped");
            }
        }
        throw new IllegalStateException();
    }

    private Set<String> getAllInstancesForTable(String tableNameWithType) {
        HashSet<String> instanceSet = new HashSet<String>();
        IdealState tableIdealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        for (String partition : tableIdealState.getPartitionSet()) {
            instanceSet.addAll(tableIdealState.getInstanceSet(partition));
        }
        return instanceSet;
    }

    public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, String downloadUrl) {
        this.addNewSegment(tableNameWithType, segmentMetadata, downloadUrl, null);
    }

    public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter) {
        SegmentZKMetadata segmentZkmetadata = this.constructZkMetadataForNewSegment(tableNameWithType, segmentMetadata, downloadUrl, crypter);
        ZNRecord znRecord = segmentZkmetadata.toZNRecord();
        String segmentName = segmentMetadata.getName();
        String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableNameWithType, (String)segmentName);
        Preconditions.checkState((boolean)this._propertyStore.set(segmentZKMetadataPath, (Object)znRecord, AccessOption.PERSISTENT), (Object)("Failed to set segment ZK metadata for table: " + tableNameWithType + ", segment: " + segmentName));
        LOGGER.info("Added segment: {} of table: {} to property store", (Object)segmentName, (Object)tableNameWithType);
        this.assignTableSegment(tableNameWithType, segmentName);
    }

    public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter) {
        String segmentName = segmentMetadata.getName();
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
        ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata);
        segmentZKMetadata.setDownloadUrl(downloadUrl);
        segmentZKMetadata.setCrypterName(crypter);
        if (TableNameBuilder.isRealtimeTableResource((String)tableNameWithType)) {
            Preconditions.checkState((boolean)this.isUpsertTable(tableNameWithType), (Object)("Upload segment " + segmentName + " for non upsert enabled realtime table " + tableNameWithType + " is not supported"));
            segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
        } else {
            segmentZKMetadata.setPushTime(System.currentTimeMillis());
        }
        return segmentZKMetadata;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void assignTableSegment(String tableNameWithType, String segmentName) {
        String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableNameWithType, (String)segmentName);
        InstancePartitionsType instancePartitionsType = TableNameBuilder.isRealtimeTableResource((String)tableNameWithType) ? InstancePartitionsType.CONSUMING : InstancePartitionsType.OFFLINE;
        try {
            TableConfig tableConfig = this.getTableConfig(tableNameWithType);
            Preconditions.checkState((tableConfig != null ? 1 : 0) != 0, (Object)("Failed to find table config for table: " + tableNameWithType));
            SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixZkManager, tableConfig);
            Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = Collections.singletonMap(instancePartitionsType, InstancePartitionsUtils.fetchOrComputeInstancePartitions((HelixManager)this._helixZkManager, (TableConfig)tableConfig, (InstancePartitionsType)instancePartitionsType));
            Object object = this.getTableUpdaterLock(tableNameWithType);
            synchronized (object) {
                HelixHelper.updateIdealState((HelixManager)this._helixZkManager, (String)tableNameWithType, idealState -> {
                    assert (idealState != null);
                    Map currentAssignment = idealState.getRecord().getMapFields();
                    if (currentAssignment.containsKey(segmentName)) {
                        LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", (Object)segmentName, (Object)tableNameWithType);
                    } else {
                        List<String> assignedInstances = segmentAssignment.assignSegment(segmentName, currentAssignment, instancePartitionsMap);
                        LOGGER.info("Assigning segment: {} to instances: {} for table: {}", new Object[]{segmentName, assignedInstances, tableNameWithType});
                        currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, "ONLINE"));
                    }
                    return idealState;
                });
                LOGGER.info("Added segment: {} to IdealState for table: {}", (Object)segmentName, (Object)tableNameWithType);
            }
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata", new Object[]{segmentName, tableNameWithType, e});
            if (this._propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) {
                LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            } else {
                LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            }
            throw e;
        }
    }

    public boolean isUpsertTable(String tableName) {
        TableConfig realtimeTableConfig = this.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName));
        if (realtimeTableConfig == null) {
            return false;
        }
        UpsertConfig upsertConfig = realtimeTableConfig.getUpsertConfig();
        return upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE;
    }

    private Object getTableUpdaterLock(String offlineTableName) {
        return this._tableUpdaterLocks[(offlineTableName.hashCode() & Integer.MAX_VALUE) % this._tableUpdaterLocks.length];
    }

    @Nullable
    public ZNRecord getSegmentMetadataZnRecord(String tableNameWithType, String segmentName) {
        return ZKMetadataProvider.getZnRecord(this._propertyStore, (String)ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableNameWithType, (String)segmentName));
    }

    public boolean createSegmentZkMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
        return ZKMetadataProvider.createSegmentZkMetadata(this._propertyStore, (String)tableNameWithType, (SegmentZKMetadata)segmentZKMetadata);
    }

    public boolean updateZkMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, int expectedVersion) {
        return ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (SegmentZKMetadata)segmentZKMetadata, (int)expectedVersion);
    }

    public boolean updateZkMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
        return ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (SegmentZKMetadata)segmentZKMetadata);
    }

    public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata, SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter) {
        String segmentName = segmentMetadata.getName();
        ZKMetadataUtils.updateSegmentMetadata(segmentZKMetadata, segmentMetadata);
        segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
        segmentZKMetadata.setDownloadUrl(downloadUrl);
        segmentZKMetadata.setCrypterName(crypter);
        if (!ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (SegmentZKMetadata)segmentZKMetadata, (int)expectedVersion)) {
            throw new RuntimeException("Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType);
        }
        LOGGER.info("Updated segment: {} of table: {} to property store", (Object)segmentName, (Object)tableNameWithType);
        this.sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
    }

    public int reloadAllSegments(String tableNameWithType, boolean forceDownload) {
        LOGGER.info("Sending reload message for table: {} with forceDownload: {}", (Object)tableNameWithType, (Object)forceDownload);
        if (forceDownload) {
            TableType tt = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
            Preconditions.checkArgument((tt == TableType.OFFLINE ? 1 : 0) != 0, (String)"Table: %s is not an OFFLINE table, which is required to force to download segments", (Object)tableNameWithType);
        }
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource(tableNameWithType);
        recipientCriteria.setSessionSpecific(true);
        SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, null, forceDownload);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        int timeoutMs = -1;
        int numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentReloadMessage, null, timeoutMs);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} reload messages for table: {}", (Object)numMessagesSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("No reload message sent for table: {}", (Object)tableNameWithType);
        }
        return numMessagesSent;
    }

    public int reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload) {
        LOGGER.info("Sending reload message for segment: {} in table: {} with forceDownload: {}", new Object[]{segmentName, tableNameWithType, forceDownload});
        if (forceDownload) {
            TableType tt = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
            Preconditions.checkArgument((tt == TableType.OFFLINE ? 1 : 0) != 0, (String)"Table: %s is not an OFFLINE table, which is required to force to download segment: %s", (Object)tableNameWithType, (Object)segmentName);
        }
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource(tableNameWithType);
        recipientCriteria.setPartition(segmentName);
        recipientCriteria.setSessionSpecific(true);
        SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, segmentName, forceDownload);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        int timeoutMs = -1;
        int numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentReloadMessage, null, timeoutMs);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} reload messages for segment: {} in table: {}", new Object[]{numMessagesSent, segmentName, tableNameWithType});
        } else {
            LOGGER.warn("No reload message sent for segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
        }
        return numMessagesSent;
    }

    public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) throws InterruptedException, TimeoutException {
        IdealState idealState = this.getTableIdealState(tableNameWithType);
        Preconditions.checkState((idealState != null ? 1 : 0) != 0, (String)"Could not find ideal state for table: %s", (Object)tableNameWithType);
        ExternalView externalView = this.getTableExternalView(tableNameWithType);
        Preconditions.checkState((externalView != null ? 1 : 0) != 0, (String)"Could not find external view for table: %s", (Object)tableNameWithType);
        Set instanceSet = idealState.getInstanceSet(segmentName);
        Preconditions.checkState((boolean)CollectionUtils.isNotEmpty((Collection)instanceSet), (String)"Could not find segment: %s in ideal state for table: %s", (Object)segmentName, (Object)tableNameWithType);
        Map externalViewStateMap = externalView.getStateMap(segmentName);
        for (String instance2 : instanceSet) {
            if (externalViewStateMap == null || !"ERROR".equals(externalViewStateMap.get(instance2))) {
                LOGGER.info("Disabling segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
                this._helixAdmin.enablePartition(false, this._helixClusterName, instance2, tableNameWithType, (List)Lists.newArrayList((Object[])new String[]{segmentName}));
                continue;
            }
            LOGGER.info("Resetting segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            this._helixAdmin.resetPartition(this._helixClusterName, instance2, tableNameWithType, (List)Lists.newArrayList((Object[])new String[]{segmentName}));
        }
        LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segment: {} of table: {}", new Object[]{externalViewWaitTimeMs, segmentName, tableNameWithType});
        long startTime = System.currentTimeMillis();
        HashSet<String> instancesToCheck = new HashSet<String>(instanceSet);
        while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
            ExternalView newExternalView = this.getTableExternalView(tableNameWithType);
            Preconditions.checkState((newExternalView != null ? 1 : 0) != 0, (String)"Could not find external view for table: %s", (Object)tableNameWithType);
            Map newExternalViewStateMap = newExternalView.getStateMap(segmentName);
            if (newExternalViewStateMap == null) continue;
            instancesToCheck.removeIf(instance -> "OFFLINE".equals(newExternalViewStateMap.get(instance)));
            Thread.sleep(1000L);
        }
        if (!instancesToCheck.isEmpty()) {
            throw new TimeoutException(String.format("Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. Disable/reset might complete in the background, but skipping enable of segment.", segmentName, tableNameWithType));
        }
        LOGGER.info("Enabling segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
        for (String instance3 : instanceSet) {
            this._helixAdmin.enablePartition(true, this._helixClusterName, instance3, tableNameWithType, (List)Lists.newArrayList((Object[])new String[]{segmentName}));
        }
    }

    public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) throws InterruptedException, TimeoutException {
        IdealState idealState = this.getTableIdealState(tableNameWithType);
        Preconditions.checkState((idealState != null ? 1 : 0) != 0, (String)"Could not find ideal state for table: %s", (Object)tableNameWithType);
        ExternalView externalView = this.getTableExternalView(tableNameWithType);
        Preconditions.checkState((externalView != null ? 1 : 0) != 0, (String)"Could not find external view for table: %s", (Object)tableNameWithType);
        HashMap<String, Set> instanceToResetSegmentsMap = new HashMap<String, Set>();
        HashMap<String, Set> instanceToDisableSegmentsMap = new HashMap<String, Set>();
        HashMap segmentInstancesToCheck = new HashMap();
        for (String string : idealState.getPartitionSet()) {
            Set instanceSet = idealState.getInstanceSet(string);
            Map externalViewStateMap = externalView.getStateMap(string);
            for (String instance : instanceSet) {
                if (externalViewStateMap == null || !"ERROR".equals(externalViewStateMap.get(instance))) {
                    instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new HashSet()).add(string);
                    continue;
                }
                instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new HashSet()).add(string);
            }
            segmentInstancesToCheck.put(string, new HashSet(instanceSet));
        }
        LOGGER.info("Disabling/resetting segments of table: {}", (Object)tableNameWithType);
        for (Map.Entry entry : instanceToResetSegmentsMap.entrySet()) {
            this._helixAdmin.resetPartition(this._helixClusterName, (String)entry.getKey(), tableNameWithType, (List)Lists.newArrayList((Iterable)((Iterable)entry.getValue())));
        }
        for (Map.Entry entry : instanceToDisableSegmentsMap.entrySet()) {
            this._helixAdmin.enablePartition(false, this._helixClusterName, (String)entry.getKey(), tableNameWithType, (List)Lists.newArrayList((Iterable)((Iterable)entry.getValue())));
        }
        LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segments of table: {}", (Object)externalViewWaitTimeMs, (Object)tableNameWithType);
        long startTime = System.currentTimeMillis();
        while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
            ExternalView newExternalView = this.getTableExternalView(tableNameWithType);
            Preconditions.checkState((newExternalView != null ? 1 : 0) != 0, (String)"Could not find external view for table: %s", (Object)tableNameWithType);
            Iterator iterator = segmentInstancesToCheck.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry entryToCheck = iterator.next();
                String segmentToCheck = (String)entryToCheck.getKey();
                Set instancesToCheck = (Set)entryToCheck.getValue();
                Map newExternalViewStateMap = newExternalView.getStateMap(segmentToCheck);
                if (newExternalViewStateMap == null) continue;
                boolean allOffline = true;
                for (String instance : instancesToCheck) {
                    if ("OFFLINE".equals(newExternalViewStateMap.get(instance))) continue;
                    allOffline = false;
                    break;
                }
                if (!allOffline) continue;
                iterator.remove();
            }
            Thread.sleep(1000L);
        }
        if (!segmentInstancesToCheck.isEmpty()) {
            throw new TimeoutException(String.format("Timed out waiting for external view to stabilize after call to disable/reset segments. Disable/reset might complete in the background, but skipping enable of segments of table: %s", tableNameWithType));
        }
        LOGGER.info("Enabling segments of table: {}", (Object)tableNameWithType);
        for (Map.Entry entry : instanceToResetSegmentsMap.entrySet()) {
            this._helixAdmin.enablePartition(true, this._helixClusterName, (String)entry.getKey(), tableNameWithType, (List)Lists.newArrayList((Iterable)((Iterable)entry.getValue())));
        }
        for (Map.Entry entry : instanceToDisableSegmentsMap.entrySet()) {
            this._helixAdmin.enablePartition(true, this._helixClusterName, (String)entry.getKey(), tableNameWithType, (List)Lists.newArrayList((Iterable)((Iterable)entry.getValue())));
        }
    }

    public void sendSegmentRefreshMessage(String tableNameWithType, String segmentName, boolean refreshServerSegment, boolean refreshBrokerRouting) {
        int numMessagesSent;
        SegmentRefreshMessage segmentRefreshMessage = new SegmentRefreshMessage(tableNameWithType, segmentName);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setSessionSpecific(true);
        ClusterMessagingService messagingService = this._helixZkManager.getMessagingService();
        if (refreshServerSegment) {
            recipientCriteria.setResource(tableNameWithType);
            recipientCriteria.setPartition(segmentName);
            numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentRefreshMessage, null, -1);
            if (numMessagesSent > 0) {
                LOGGER.info("Sent {} segment refresh messages to servers for segment: {} of table: {}", new Object[]{numMessagesSent, segmentName, tableNameWithType});
            } else {
                LOGGER.warn("No segment refresh message sent to servers for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            }
        }
        if (refreshBrokerRouting) {
            recipientCriteria.setResource("brokerResource");
            recipientCriteria.setPartition(tableNameWithType);
            numMessagesSent = messagingService.send(recipientCriteria, (Message)segmentRefreshMessage, null, -1);
            if (numMessagesSent > 0) {
                LOGGER.info("Sent {} segment refresh messages to brokers for segment: {} of table: {}", new Object[]{numMessagesSent, segmentName, tableNameWithType});
            } else {
                LOGGER.warn("No segment refresh message sent to brokers for segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
            }
        }
    }

    private void sendTableConfigRefreshMessage(String tableNameWithType) {
        TableConfigRefreshMessage tableConfigRefreshMessage = new TableConfigRefreshMessage(tableNameWithType);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource("brokerResource");
        recipientCriteria.setSessionSpecific(true);
        recipientCriteria.setPartition(tableNameWithType);
        int numMessagesSent = this._helixZkManager.getMessagingService().send(recipientCriteria, (Message)tableConfigRefreshMessage, null, -1);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} table config refresh messages to brokers for table: {}", (Object)numMessagesSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("No table config refresh message sent to brokers for table: {}", (Object)tableNameWithType);
        }
    }

    private void sendRoutingTableRebuildMessage(String tableNameWithType) {
        RoutingTableRebuildMessage routingTableRebuildMessage = new RoutingTableRebuildMessage(tableNameWithType);
        Criteria recipientCriteria = new Criteria();
        recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        recipientCriteria.setInstanceName("%");
        recipientCriteria.setResource("brokerResource");
        recipientCriteria.setSessionSpecific(true);
        recipientCriteria.setPartition(tableNameWithType);
        int numMessagesSent = this._helixZkManager.getMessagingService().send(recipientCriteria, (Message)routingTableRebuildMessage, null, -1);
        if (numMessagesSent > 0) {
            LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", (Object)numMessagesSent, (Object)tableNameWithType);
        } else {
            LOGGER.warn("No routing table rebuild message sent to brokers for table: {}", (Object)tableNameWithType);
        }
    }

    public void toggleQueryQuotaStateForBroker(String brokerInstanceName, String state) {
        HashMap<String, String> propToUpdate = new HashMap<String, String>();
        propToUpdate.put("queryRateLimitDisabled", Boolean.toString("DISABLE".equals(state)));
        HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, new String[]{this._helixClusterName}).forParticipant(brokerInstanceName).build();
        this._helixAdmin.setConfig(scope, propToUpdate);
    }

    public Map<String, List<String>> getServerToSegmentsMap(String tableNameWithType) {
        TreeMap<String, List<String>> serverToSegmentsMap = new TreeMap<String, List<String>>();
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        if (idealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
        }
        for (String segment : idealState.getPartitionSet()) {
            for (String server : idealState.getInstanceStateMap(segment).keySet()) {
                serverToSegmentsMap.computeIfAbsent(server, key -> new ArrayList()).add(segment);
            }
        }
        return serverToSegmentsMap;
    }

    public Set<String> getConsumingSegments(String tableNameWithType) {
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        if (idealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
        }
        HashSet<String> consumingSegments = new HashSet<String>();
        for (String segment : idealState.getPartitionSet()) {
            Map instanceStateMap = idealState.getInstanceStateMap(segment);
            if (!instanceStateMap.containsValue("CONSUMING")) continue;
            consumingSegments.add(segment);
        }
        return consumingSegments;
    }

    public Set<String> getServersForSegment(String tableNameWithType, String segmentName) {
        IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        if (idealState == null) {
            throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType);
        }
        return new HashSet<String>(idealState.getInstanceStateMap(segmentName).keySet());
    }

    public synchronized Map<String, String> getSegmentsCrcForTable(String tableNameWithType) {
        IdealState is = this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
        ArrayList segmentList = new ArrayList(is.getPartitionSet());
        ArrayList<String> segmentMetadataPaths = new ArrayList<String>(segmentList.size());
        for (String segmentName : segmentList) {
            segmentMetadataPaths.add(this.buildPathForSegmentMetadata(tableNameWithType, segmentName));
        }
        if (!this._segmentCrcMap.containsKey(tableNameWithType)) {
            this._lastKnownSegmentMetadataVersionMap.put(tableNameWithType, new HashMap());
            this._segmentCrcMap.put(tableNameWithType, new HashMap());
        }
        Stat[] metadataStats = this._propertyStore.getStats(segmentMetadataPaths, AccessOption.PERSISTENT);
        for (int i = 0; i < metadataStats.length; ++i) {
            String currentSegment = (String)segmentList.get(i);
            Stat metadataStat = metadataStats[i];
            if (metadataStat == null) continue;
            int currentVersion = metadataStat.getVersion();
            if (this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).containsKey(currentSegment)) {
                int lastKnownVersion = this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).get(currentSegment);
                if (lastKnownVersion == currentVersion) continue;
                this.updateSegmentMetadataCrc(tableNameWithType, currentSegment, currentVersion);
                continue;
            }
            this.updateSegmentMetadataCrc(tableNameWithType, currentSegment, currentVersion);
        }
        Set segmentsSet = is.getPartitionSet();
        Iterator<Map.Entry<String, Long>> iter = this._segmentCrcMap.get(tableNameWithType).entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, Long> entry = iter.next();
            String segmentName = entry.getKey();
            if (segmentsSet.contains(segmentName)) continue;
            iter.remove();
            this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).remove(segmentName);
        }
        TreeMap<String, String> resultCrcMap = new TreeMap<String, String>();
        for (String segment : segmentList) {
            resultCrcMap.put(segment, String.valueOf(this._segmentCrcMap.get(tableNameWithType).get(segment)));
        }
        return resultCrcMap;
    }

    private void updateSegmentMetadataCrc(String tableNameWithType, String segmentName, int currentVersion) {
        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (String)segmentName);
        assert (segmentZKMetadata != null);
        this._lastKnownSegmentMetadataVersionMap.get(tableNameWithType).put(segmentName, currentVersion);
        this._segmentCrcMap.get(tableNameWithType).put(segmentName, segmentZKMetadata.getCrc());
    }

    public String buildPathForSegmentMetadata(String tableNameWithType, String segmentName) {
        return "/SEGMENTS/" + tableNameWithType + "/" + segmentName;
    }

    public boolean hasTable(String tableNameWithType) {
        return this.getAllResources().contains(tableNameWithType);
    }

    public boolean hasOfflineTable(String tableName) {
        return this.hasTable(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
    }

    public boolean hasRealtimeTable(String tableName) {
        return this.hasTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
    }

    public boolean isTableEnabled(String tableNameWithType) throws TableNotFoundException {
        IdealState idealState = this.getTableIdealState(tableNameWithType);
        if (idealState == null) {
            throw new TableNotFoundException("Failed to find ideal state for table: " + tableNameWithType);
        }
        return idealState.isEnabled();
    }

    @Nullable
    public IdealState getTableIdealState(String tableNameWithType) {
        return this._helixAdmin.getResourceIdealState(this._helixClusterName, tableNameWithType);
    }

    @Nullable
    public ExternalView getTableExternalView(String tableNameWithType) {
        return this._helixAdmin.getResourceExternalView(this._helixClusterName, tableNameWithType);
    }

    @Nullable
    public TableConfig getTableConfig(String tableNameWithType) {
        return ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
    }

    @Nullable
    public TableConfig getOfflineTableConfig(String tableName) {
        return ZKMetadataProvider.getOfflineTableConfig(this._propertyStore, (String)tableName);
    }

    @Nullable
    public TableConfig getRealtimeTableConfig(String tableName) {
        return ZKMetadataProvider.getRealtimeTableConfig(this._propertyStore, (String)tableName);
    }

    @Nullable
    public TableConfig getTableConfig(String tableName, TableType tableType) {
        if (tableType == TableType.OFFLINE) {
            return this.getOfflineTableConfig(tableName);
        }
        return this.getRealtimeTableConfig(tableName);
    }

    public List<TableConfig> getTableConfigsForSchema(String schemaName) {
        TableConfig realtimeTableConfig;
        ArrayList<TableConfig> tableConfigs = new ArrayList<TableConfig>();
        TableConfig offlineTableConfig = this.getOfflineTableConfig(schemaName);
        if (offlineTableConfig != null) {
            tableConfigs.add(offlineTableConfig);
        }
        if ((realtimeTableConfig = this.getRealtimeTableConfig(schemaName)) != null) {
            tableConfigs.add(realtimeTableConfig);
        }
        return tableConfigs;
    }

    public List<String> getServerInstancesForTable(String tableName, TableType tableType) {
        TableConfig tableConfig = this.getTableConfig(tableName, tableType);
        Preconditions.checkNotNull((Object)tableConfig);
        TenantConfig tenantConfig = tableConfig.getTenantConfig();
        HashSet serverInstances = new HashSet();
        List instanceConfigs = HelixHelper.getInstanceConfigs((HelixManager)this._helixZkManager);
        if (tableType == TableType.OFFLINE) {
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractOfflineServerTag((TenantConfig)tenantConfig)));
        } else if (TableType.REALTIME.equals((Object)tableType)) {
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractConsumingServerTag((TenantConfig)tenantConfig)));
            serverInstances.addAll(HelixHelper.getInstancesWithTag((List)instanceConfigs, (String)TagNameUtils.extractCompletedServerTag((TenantConfig)tenantConfig)));
        }
        return new ArrayList<String>(serverInstances);
    }

    public List<String> getBrokerInstancesForTable(String tableName, TableType tableType) {
        TableConfig tableConfig = this.getTableConfig(tableName, tableType);
        Preconditions.checkNotNull((Object)tableConfig);
        return HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)TagNameUtils.extractBrokerTag((TenantConfig)tableConfig.getTenantConfig()));
    }

    public PinotResourceManagerResponse enableInstance(String instanceName) {
        return this.enableInstance(instanceName, true, 10000L);
    }

    public PinotResourceManagerResponse disableInstance(String instanceName) {
        return this.enableInstance(instanceName, false, 10000L);
    }

    public PinotResourceManagerResponse dropInstance(String instanceName) {
        if (this._helixDataAccessor.getProperty(this._keyBuilder.liveInstance(instanceName)) != null) {
            return PinotResourceManagerResponse.failure("Instance " + instanceName + " is still live");
        }
        for (String resource : this.getAllResources()) {
            IdealState idealState = this._helixAdmin.getResourceIdealState(this._helixClusterName, resource);
            for (String partition : idealState.getPartitionSet()) {
                if (!idealState.getInstanceSet(partition).contains(instanceName)) continue;
                return PinotResourceManagerResponse.failure("Instance " + instanceName + " exists in ideal state for " + resource);
            }
        }
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> this._helixDataAccessor.removeProperty(this._keyBuilder.instance(instanceName)));
        }
        catch (Exception e) {
            return PinotResourceManagerResponse.failure("Failed to remove /INSTANCES/" + instanceName);
        }
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> this._helixDataAccessor.removeProperty(this._keyBuilder.instanceConfig(instanceName)));
        }
        catch (Exception e) {
            return PinotResourceManagerResponse.failure("Failed to remove /CONFIGS/PARTICIPANT/" + instanceName + ". Make sure to remove /CONFIGS/PARTICIPANT/" + instanceName + " manually since /INSTANCES/" + instanceName + " has already been removed");
        }
        return PinotResourceManagerResponse.success("Instance " + instanceName + " dropped");
    }

    private PinotResourceManagerResponse enableInstance(String instanceName, boolean enableInstance, long timeOutMs) {
        if (!this.instanceExists(instanceName)) {
            return PinotResourceManagerResponse.failure("Instance " + instanceName + " not found");
        }
        this._helixAdmin.enableInstance(this._helixClusterName, instanceName, enableInstance);
        long intervalWaitTimeMs = 500L;
        long deadline = System.currentTimeMillis() + timeOutMs;
        String offlineState = "OFFLINE";
        while (System.currentTimeMillis() < deadline) {
            PropertyKey liveInstanceKey = this._keyBuilder.liveInstance(instanceName);
            LiveInstance liveInstance = (LiveInstance)this._helixDataAccessor.getProperty(liveInstanceKey);
            if (liveInstance == null) {
                if (!enableInstance) {
                    return PinotResourceManagerResponse.SUCCESS;
                }
            } else {
                boolean toggleSucceeded = true;
                PropertyKey instanceCurrentStatesKey = this._keyBuilder.currentStates(instanceName, liveInstance.getSessionId());
                List instanceCurrentStates = this._helixDataAccessor.getChildValues(instanceCurrentStatesKey, true);
                if (instanceCurrentStates.isEmpty()) {
                    return PinotResourceManagerResponse.SUCCESS;
                }
                for (CurrentState currentState : instanceCurrentStates) {
                    for (String state : currentState.getPartitionStateMap().values()) {
                        if ((!enableInstance || offlineState.equals(state)) && (enableInstance || !offlineState.equals(state))) continue;
                        toggleSucceeded = false;
                        break;
                    }
                    if (toggleSucceeded) continue;
                    break;
                }
                if (toggleSucceeded) {
                    return enableInstance ? PinotResourceManagerResponse.success("Instance " + instanceName + " enabled") : PinotResourceManagerResponse.success("Instance " + instanceName + " disabled");
                }
            }
            try {
                Thread.sleep(intervalWaitTimeMs);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Got interrupted when sleeping for {}ms to wait until the current state matched for instance: {}", (Object)intervalWaitTimeMs, (Object)instanceName);
                return PinotResourceManagerResponse.failure("Got interrupted when waiting for instance to be " + (enableInstance ? "enabled" : "disabled"));
            }
        }
        return PinotResourceManagerResponse.failure("Instance " + (enableInstance ? "enable" : "disable") + " failed, timeout");
    }

    public RebalanceResult rebalanceTable(String tableNameWithType, Configuration rebalanceConfig) throws TableNotFoundException {
        TableConfig tableConfig = this.getTableConfig(tableNameWithType);
        if (tableConfig == null) {
            throw new TableNotFoundException("Failed to find table config for table: " + tableNameWithType);
        }
        return new TableRebalancer(this._helixZkManager).rebalance(tableConfig, rebalanceConfig);
    }

    public boolean instanceExists(String instanceName) {
        return this.getHelixInstanceConfig(instanceName) != null;
    }

    public List<String> getOnlineUnTaggedBrokerInstanceList() {
        List instanceList = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)"broker_untagged");
        List liveInstances = this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
        instanceList.retainAll(liveInstances);
        return instanceList;
    }

    public List<String> getOnlineUnTaggedServerInstanceList() {
        List instanceList = HelixHelper.getInstancesWithTag((HelixManager)this._helixZkManager, (String)"server_untagged");
        List liveInstances = this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
        instanceList.retainAll(liveInstances);
        return instanceList;
    }

    public List<String> getOnlineInstanceList() {
        return this._helixDataAccessor.getChildNames(this._keyBuilder.liveInstances());
    }

    public BiMap<String, String> getDataInstanceAdminEndpoints(Set<String> instances) throws InvalidConfigException {
        HashBiMap endpointToInstance = HashBiMap.create((int)instances.size());
        for (String instance : instances) {
            String instanceAdminEndpoint;
            try {
                instanceAdminEndpoint = (String)this._instanceAdminEndpointCache.get((Object)instance);
            }
            catch (ExecutionException e) {
                String errorMessage = String.format("ExecutionException when getting instance admin endpoint for instance: %s. Error message: %s", instance, e.getMessage());
                LOGGER.error(errorMessage, (Throwable)e);
                throw new InvalidConfigException(errorMessage);
            }
            endpointToInstance.put((Object)instance, (Object)instanceAdminEndpoint);
        }
        return endpointToInstance;
    }

    public List<String> getExistingTableNamesWithType(String tableName, @Nullable TableType tableType) throws TableNotFoundException {
        ArrayList<String> tableNamesWithType = new ArrayList<String>(2);
        TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName((String)tableName);
        if (tableTypeFromTableName != null) {
            if (tableType != null && tableType != tableTypeFromTableName) {
                throw new IllegalArgumentException("Table name: " + tableName + " does not match table type: " + tableType);
            }
            if (this.getTableConfig(tableName) != null) {
                tableNamesWithType.add(tableName);
            }
        } else {
            String realtimeTableName;
            String offlineTableName;
            if ((tableType == null || tableType == TableType.OFFLINE) && this.getTableConfig(offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName)) != null) {
                tableNamesWithType.add(offlineTableName);
            }
            if ((tableType == null || tableType == TableType.REALTIME) && this.getTableConfig(realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName)) != null) {
                tableNamesWithType.add(realtimeTableName);
            }
        }
        if (tableNamesWithType.isEmpty()) {
            throw new TableNotFoundException("Table '" + tableName + (String)(tableType != null ? "_" + tableType.toString() : "") + "' not found.");
        }
        return tableNamesWithType;
    }

    public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo, boolean forceCleanup) {
        String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId();
        HashSet<String> segmentsForTable = new HashSet<String>(this.getSegmentsFor(tableNameWithType, false));
        Preconditions.checkArgument((boolean)segmentsForTable.containsAll(segmentsFrom), (Object)String.format("Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo, segmentsForTable));
        Preconditions.checkArgument((boolean)Collections.disjoint(segmentsForTable, segmentsTo), (Object)String.format("Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo, segmentsForTable));
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                SegmentLineage segmentLineage;
                TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
                Preconditions.checkNotNull((Object)tableConfig, (String)"Table config is not available for table '%s'", (Object)tableNameWithType);
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, (String)tableNameWithType);
                int expectedVersion = -1;
                if (segmentLineageZNRecord == null) {
                    segmentLineage = new SegmentLineage(tableNameWithType);
                } else {
                    segmentLineage = SegmentLineage.fromZNRecord((ZNRecord)segmentLineageZNRecord);
                    expectedVersion = segmentLineageZNRecord.getVersion();
                }
                Preconditions.checkArgument((segmentLineage.getLineageEntry(segmentLineageEntryId) == null ? 1 : 0) != 0, (Object)String.format("SegmentLineageEntryId (%s) already exists in the segment lineage.", segmentLineageEntryId));
                ArrayList<String> segmentsToCleanUp = new ArrayList<String>();
                for (String entryId : segmentLineage.getLineageEntryIds()) {
                    LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
                    if (lineageEntry.getState() == LineageEntryState.REVERTED) {
                        if (!forceCleanup) continue;
                        segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
                        continue;
                    }
                    if (forceCleanup) {
                        if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && !Collections.disjoint(segmentsFrom, lineageEntry.getSegmentsFrom())) {
                            LOGGER.info("Detected the incomplete lineage entry with the same 'segmentsFrom'. Reverting the lineage entry to unblock the new segment protocol. tableNameWithType={}, entryId={}, segmentsFrom={}, segmentsTo={}", new Object[]{tableNameWithType, entryId, lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo()});
                            this.updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, entryId, lineageEntry);
                            segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
                            continue;
                        }
                        if (lineageEntry.getState() != LineageEntryState.COMPLETED || !IngestionConfigUtils.getBatchSegmentIngestionType((TableConfig)tableConfig).equalsIgnoreCase("REFRESH") || !CollectionUtils.isEqualCollection((Collection)segmentsFrom, (Collection)lineageEntry.getSegmentsTo())) continue;
                        LOGGER.info("Proactively deleting the replaced segments for REFRESH table to avoid the excessive disk waste. tableNameWithType={}, segmentsToCleanUp={}", (Object)tableNameWithType, (Object)lineageEntry.getSegmentsFrom());
                        segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
                        continue;
                    }
                    Preconditions.checkArgument((boolean)Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), (Object)String.format("It is not allowed to replace segments that are already replaced. (tableName = %s, segmentsFrom from the existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType, lineageEntry.getSegmentsFrom(), segmentsFrom));
                    Preconditions.checkArgument((boolean)Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), (Object)String.format("It is not allowed to have the same segment name for segments in 'segmentsTo'. (tableName = %s, segmentsTo from the existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsTo));
                }
                segmentLineage.addLineageEntry(segmentLineageEntryId, new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
                if (SegmentLineageAccessHelper.writeSegmentLineage(this._propertyStore, (SegmentLineage)segmentLineage, (int)expectedVersion)) {
                    if (!segmentsToCleanUp.isEmpty()) {
                        LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp);
                        this.deleteSegments(tableNameWithType, segmentsToCleanUp);
                    }
                    return true;
                }
                return false;
            });
        }
        catch (Exception e) {
            String errorMsg = String.format("Failed to update the segment lineage during startReplaceSegments. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)", tableNameWithType, segmentsFrom, segmentsTo);
            LOGGER.error(errorMsg, (Throwable)e);
            throw new RuntimeException(errorMsg, e);
        }
        LOGGER.info("startReplaceSegments is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, segmentsTo = {}, segmentLineageEntryId = {})", new Object[]{tableNameWithType, segmentsFrom, segmentsTo, segmentLineageEntryId});
        return segmentLineageEntryId;
    }

    public void endReplaceSegments(String tableNameWithType, String segmentLineageEntryId) {
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, (String)tableNameWithType);
                int expectedVersion = -1;
                Preconditions.checkArgument((segmentLineageZNRecord != null ? 1 : 0) != 0, (Object)String.format("Segment lineage does not exist. (tableNameWithType = '%s', segmentLineageEntryId = '%s')", tableNameWithType, segmentLineageEntryId));
                SegmentLineage segmentLineage = SegmentLineage.fromZNRecord((ZNRecord)segmentLineageZNRecord);
                expectedVersion = segmentLineageZNRecord.getVersion();
                LineageEntry lineageEntry = segmentLineage.getLineageEntry(segmentLineageEntryId);
                Preconditions.checkArgument((lineageEntry != null ? 1 : 0) != 0, (Object)String.format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType, segmentLineageEntryId));
                if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS) {
                    String errorMsg = String.format("The target lineage entry state is not 'IN_PROGRESS'. Cannot update to 'COMPLETED' state. (tableNameWithType=%s, segmentLineageEntryId=%s, state=%s)", tableNameWithType, segmentLineageEntryId, lineageEntry.getState());
                    LOGGER.error(errorMsg);
                    throw new RuntimeException(errorMsg);
                }
                HashSet<String> segmentsForTable = new HashSet<String>(this.getSegmentsFor(tableNameWithType, false));
                Preconditions.checkArgument((boolean)segmentsForTable.containsAll(lineageEntry.getSegmentsTo()), (Object)String.format("Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable));
                try {
                    this.waitForSegmentsBecomeOnline(tableNameWithType, new HashSet<String>(lineageEntry.getSegmentsTo()));
                }
                catch (TimeoutException e) {
                    LOGGER.warn(String.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)", tableNameWithType, lineageEntry.getSegmentsTo()), (Throwable)e);
                    return false;
                }
                LineageEntry newLineageEntry = new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED, System.currentTimeMillis());
                segmentLineage.updateLineageEntry(segmentLineageEntryId, newLineageEntry);
                if (SegmentLineageAccessHelper.writeSegmentLineage(this._propertyStore, (SegmentLineage)segmentLineage, (int)expectedVersion)) {
                    this.sendRoutingTableRebuildMessage(tableNameWithType);
                    return true;
                }
                return false;
            });
        }
        catch (Exception e) {
            String errorMsg = String.format("Failed to update the segment lineage during endReplaceSegments. (tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId);
            LOGGER.error(errorMsg, (Throwable)e);
            throw new RuntimeException(errorMsg, e);
        }
        LOGGER.info("endReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})", (Object)tableNameWithType, (Object)segmentLineageEntryId);
    }

    public void revertReplaceSegments(String tableNameWithType, String segmentLineageEntryId, boolean forceRevert) {
        try {
            DEFAULT_RETRY_POLICY.attempt(() -> {
                ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper.getSegmentLineageZNRecord(this._propertyStore, (String)tableNameWithType);
                Preconditions.checkArgument((segmentLineageZNRecord != null ? 1 : 0) != 0, (Object)String.format("Segment lineage does not exist. (tableNameWithType = '%s', segmentLineageEntryId = '%s')", tableNameWithType, segmentLineageEntryId));
                SegmentLineage segmentLineage = SegmentLineage.fromZNRecord((ZNRecord)segmentLineageZNRecord);
                int expectedVersion = segmentLineageZNRecord.getVersion();
                LineageEntry lineageEntry = segmentLineage.getLineageEntry(segmentLineageEntryId);
                Preconditions.checkArgument((lineageEntry != null ? 1 : 0) != 0, (Object)String.format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType, segmentLineageEntryId));
                if (lineageEntry.getState() == LineageEntryState.REVERTED || lineageEntry.getState() == LineageEntryState.IN_PROGRESS && !forceRevert) {
                    String errorMsg = String.format("Lineage state is not valid. Cannot update the lineage entry to be 'REVERTED'. (tableNameWithType=%s, segmentLineageEntryId=%s, segmentLineageEntryState=%s, forceRevert=%s)", tableNameWithType, segmentLineageEntryId, lineageEntry.getState(), forceRevert);
                    throw new RuntimeException(errorMsg);
                }
                for (String currentEntryId : segmentLineage.getLineageEntryIds()) {
                    LineageEntry currentLineageEntry = segmentLineage.getLineageEntry(currentEntryId);
                    if (currentLineageEntry.getState() != LineageEntryState.IN_PROGRESS && currentLineageEntry.getState() != LineageEntryState.COMPLETED) continue;
                    Preconditions.checkArgument((boolean)Collections.disjoint(lineageEntry.getSegmentsTo(), currentLineageEntry.getSegmentsFrom()), (Object)String.format("Cannot revert lineage entry, found segments from 'segmentsTo' appear in 'segmentsFrom' of another lineage entry. (tableNameWithType='%s', segmentLineageEntryId='%s', segmentsTo = '%s', segmentLineageEntryId='%s' segmentsFrom = '%s')", tableNameWithType, segmentLineageEntryId, lineageEntry.getSegmentsTo(), currentEntryId, currentLineageEntry.getSegmentsFrom()));
                }
                this.updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, segmentLineageEntryId, lineageEntry);
                if (SegmentLineageAccessHelper.writeSegmentLineage(this._propertyStore, (SegmentLineage)segmentLineage, (int)expectedVersion)) {
                    this.sendRoutingTableRebuildMessage(tableNameWithType);
                    this.deleteSegments(tableNameWithType, lineageEntry.getSegmentsTo());
                    return true;
                }
                return false;
            });
        }
        catch (Exception e) {
            String errorMsg = String.format("Failed to update the segment lineage during revertReplaceSegments. (tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId);
            LOGGER.error(errorMsg, (Throwable)e);
            throw new RuntimeException(errorMsg, e);
        }
        LOGGER.info("revertReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})", (Object)tableNameWithType, (Object)segmentLineageEntryId);
    }

    private void updateSegmentLineageEntryToReverted(String tableNameWithType, SegmentLineage segmentLineage, String segmentLineageEntryId, LineageEntry lineageEntry) {
        Set<String> onlineSegments = this.getOnlineSegmentsFromExternalView(tableNameWithType);
        Preconditions.checkArgument((boolean)onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), (Object)String.format("Failed to update the lineage to be 'REVERTED'. Not all segments from 'segmentFrom' are in ONLINE state in the external view. (tableName = '%s', segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType, lineageEntry.getSegmentsFrom(), onlineSegments));
        segmentLineage.updateLineageEntry(segmentLineageEntryId, new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED, System.currentTimeMillis()));
    }

    private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> segmentsToCheck) throws InterruptedException, TimeoutException {
        long endTimeMs = System.currentTimeMillis() + 600000L;
        do {
            Set<String> onlineSegments;
            if ((onlineSegments = this.getOnlineSegmentsFromExternalView(tableNameWithType)).containsAll(segmentsToCheck)) {
                return;
            }
            Thread.sleep(1000L);
        } while (System.currentTimeMillis() < endTimeMs);
        throw new TimeoutException(String.format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)", tableNameWithType, segmentsToCheck));
    }

    private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
        ExternalView externalView = this.getTableExternalView(tableNameWithType);
        Preconditions.checkState((externalView != null ? 1 : 0) != 0, (Object)String.format("External view is null for table (%s)", tableNameWithType));
        Map segmentAssignment = externalView.getRecord().getMapFields();
        HashSet<String> onlineSegments = new HashSet<String>(HashUtil.getHashMapCapacity((int)segmentAssignment.size()));
        for (Map.Entry entry : segmentAssignment.entrySet()) {
            Map instanceStateMap = (Map)entry.getValue();
            if (!instanceStateMap.containsValue("ONLINE") && !instanceStateMap.containsValue("CONSUMING")) continue;
            onlineSegments.add((String)entry.getKey());
        }
        return onlineSegments;
    }

    public TableStats getTableStats(String tableNameWithType) {
        String zkPath = ZKMetadataProvider.constructPropertyStorePathForResourceConfig((String)tableNameWithType);
        Stat stat = this._propertyStore.getStat(zkPath, AccessOption.PERSISTENT);
        Preconditions.checkState((stat != null ? 1 : 0) != 0, (String)"Failed to read ZK stats for table: %s", (Object)tableNameWithType);
        String creationTime = SIMPLE_DATE_FORMAT.format(stat.getCtime());
        return new TableStats(creationTime);
    }

    public List<String> getLiveBrokersForTable(String tableName) throws TableNotFoundException {
        ExternalView ev = (ExternalView)this._helixDataAccessor.getProperty(this._keyBuilder.externalView("brokerResource"));
        if (ev == null) {
            throw new IllegalStateException("Failed to find external view for brokerResource");
        }
        TableType inputTableType = TableNameBuilder.getTableTypeFromTableName((String)tableName);
        if (inputTableType != null) {
            if (!this.hasTable(tableName)) {
                throw new TableNotFoundException(String.format("Table=%s not found", tableName));
            }
            return this.getLiveBrokersForTable(ev, tableName);
        }
        String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
        String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        boolean hasOfflineTable = this.hasTable(offlineTableName);
        boolean hasRealtimeTable = this.hasTable(realtimeTableName);
        if (!hasOfflineTable && !hasRealtimeTable) {
            throw new TableNotFoundException(String.format("Table=%s not found", tableName));
        }
        if (hasOfflineTable && hasRealtimeTable) {
            HashSet<String> offlineTables = new HashSet<String>(this.getLiveBrokersForTable(ev, offlineTableName));
            return this.getLiveBrokersForTable(ev, realtimeTableName).stream().filter(offlineTables::contains).collect(Collectors.toList());
        }
        return this.getLiveBrokersForTable(ev, hasOfflineTable ? offlineTableName : realtimeTableName);
    }

    private List<String> getLiveBrokersForTable(ExternalView ev, String tableNameWithType) {
        Map brokerToStateMap = ev.getStateMap(tableNameWithType);
        ArrayList<String> hosts = new ArrayList<String>();
        if (brokerToStateMap != null) {
            for (Map.Entry entry : brokerToStateMap.entrySet()) {
                if (!"ONLINE".equalsIgnoreCase((String)entry.getValue())) continue;
                hosts.add((String)entry.getKey());
            }
        }
        return hosts;
    }
}

