/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache;
import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.segment.metadata.SegmentMetadataCacheConfig;
import org.apache.druid.segment.metadata.SegmentSchemaBackFillQueue;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

@ManageLifecycle
public class CoordinatorSegmentMetadataCache
extends AbstractSegmentMetadataCache<DataSourceInformation> {
    private static final EmittingLogger log = new EmittingLogger(CoordinatorSegmentMetadataCache.class);
    private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L;
    private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = TimeUnit.SECONDS.toMillis(50L);
    private final SegmentMetadataCacheConfig config;
    private final AbstractSegmentMetadataCache.ColumnTypeMergePolicy columnTypeMergePolicy;
    private final SegmentSchemaCache segmentSchemaCache;
    private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
    private final SegmentsMetadataManager segmentsMetadataManager;
    private final Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;
    private volatile SegmentReplicationStatus segmentReplicationStatus = null;
    private final ConcurrentHashMap<String, DataSourceInformation> coldSchemaTable = new ConcurrentHashMap();
    private final ScheduledExecutorService coldSchemaExec;
    @Nullable
    private Future<?> cacheExecFuture = null;
    @Nullable
    private Future<?> coldSchemaExecFuture = null;

    @Inject
    public CoordinatorSegmentMetadataCache(QueryLifecycleFactory queryLifecycleFactory, CoordinatorServerView serverView, SegmentMetadataCacheConfig config, Escalator escalator, InternalQueryConfig internalQueryConfig, ServiceEmitter emitter, SegmentSchemaCache segmentSchemaCache, SegmentSchemaBackFillQueue segmentSchemaBackfillQueue, SegmentsMetadataManager segmentsMetadataManager, Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier) {
        super(queryLifecycleFactory, config, escalator, internalQueryConfig, emitter);
        this.config = config;
        this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy();
        this.segmentSchemaCache = segmentSchemaCache;
        this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
        this.segmentsMetadataManager = segmentsMetadataManager;
        this.segmentsMetadataManagerConfigSupplier = segmentsMetadataManagerConfigSupplier;
        this.coldSchemaExec = Execs.scheduledSingleThreaded((String)"DruidColdSchema-ScheduledExecutor-%d");
        this.initServerViewTimelineCallback(serverView);
    }

    private long getColdSchemaExecPeriodMillis() {
        return ((SegmentsMetadataManagerConfig)this.segmentsMetadataManagerConfigSupplier.get()).getPollDuration().toStandardDuration().getMillis() * COLD_SCHEMA_PERIOD_MULTIPLIER;
    }

    private void initServerViewTimelineCallback(CoordinatorServerView serverView) {
        serverView.registerTimelineCallback(this.callbackExec, new TimelineServerView.TimelineCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public ServerView.CallbackAction timelineInitialized() {
                Object object = CoordinatorSegmentMetadataCache.this.lock;
                synchronized (object) {
                    CoordinatorSegmentMetadataCache.this.isServerViewInitialized = true;
                    CoordinatorSegmentMetadataCache.this.lock.notifyAll();
                }
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                CoordinatorSegmentMetadataCache.this.addSegment(server, segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction segmentRemoved(DataSegment segment) {
                CoordinatorSegmentMetadataCache.this.removeSegment(segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment) {
                CoordinatorSegmentMetadataCache.this.removeServerSegment(server, segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                CoordinatorSegmentMetadataCache.this.updateSchemaForRealtimeSegments(segmentSchemas);
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    @Override
    @LifecycleStart
    public void start() {
    }

    @Override
    @LifecycleStop
    public void stop() {
        this.callbackExec.shutdownNow();
        this.cacheExec.shutdownNow();
        this.coldSchemaExec.shutdownNow();
        this.segmentSchemaCache.onLeaderStop();
        this.segmentSchemaBackfillQueue.onLeaderStop();
        if (this.cacheExecFuture != null) {
            this.cacheExecFuture.cancel(true);
        }
        if (this.coldSchemaExecFuture != null) {
            this.coldSchemaExecFuture.cancel(true);
        }
    }

    public void onLeaderStart() {
        log.info("Initializing cache on leader node.", new Object[0]);
        try {
            this.segmentSchemaBackfillQueue.onLeaderStart();
            this.cacheExecFuture = this.cacheExec.submit(this::cacheExecLoop);
            this.coldSchemaExecFuture = this.coldSchemaExec.scheduleWithFixedDelay(this::refreshColdSegmentSchemas, this.getColdSchemaExecPeriodMillis(), this.getColdSchemaExecPeriodMillis(), TimeUnit.MILLISECONDS);
            if (this.config.isAwaitInitializationOnStart()) {
                this.awaitInitialization();
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void onLeaderStop() {
        log.info("No longer leader, stopping cache.", new Object[0]);
        if (this.cacheExecFuture != null) {
            this.cacheExecFuture.cancel(true);
        }
        if (this.coldSchemaExecFuture != null) {
            this.coldSchemaExecFuture.cancel(true);
        }
        this.segmentSchemaCache.onLeaderStop();
        this.segmentSchemaBackfillQueue.onLeaderStop();
    }

    @Override
    public synchronized void refreshWaitCondition() throws InterruptedException {
        this.segmentSchemaCache.awaitInitialization();
    }

    public void updateSegmentReplicationStatus(SegmentReplicationStatus segmentReplicationStatus) {
        this.segmentReplicationStatus = segmentReplicationStatus;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void unmarkSegmentAsMutable(SegmentId segmentId) {
        Object object = this.lock;
        synchronized (object) {
            log.debug("SegmentId [%s] is marked as finalized.", new Object[]{segmentId});
            this.mutableSegments.remove(segmentId);
            this.segmentSchemaCache.realtimeSegmentRemoved(segmentId);
        }
    }

    @Override
    protected void removeSegmentAction(SegmentId segmentId) {
        log.debug("SegmentId [%s] is removed.", new Object[]{segmentId});
        this.segmentSchemaCache.segmentRemoved(segmentId);
    }

    @Override
    protected boolean fetchAggregatorsInSegmentMetadataQuery() {
        return true;
    }

    @Override
    protected boolean updateSegmentMetadata(SegmentId segmentId, SegmentAnalysis analysis) {
        RowSignature rowSignature = CoordinatorSegmentMetadataCache.analysisToRowSignature(analysis);
        log.debug("Segment[%s] has signature[%s].", new Object[]{segmentId, rowSignature});
        AtomicBoolean added = new AtomicBoolean(false);
        this.segmentMetadataInfo.compute(segmentId.getDataSource(), (datasourceKey, dataSourceSegments) -> {
            if (dataSourceSegments == null) {
                log.warn("No segment map found with datasource [%s], skipping refresh of segment [%s]", new Object[]{datasourceKey, segmentId});
                return null;
            }
            dataSourceSegments.compute(segmentId, (segmentIdKey, segmentMetadata) -> {
                if (segmentMetadata == null) {
                    log.warn("No segment [%s] found, skipping refresh", new Object[]{segmentId});
                    return null;
                }
                long numRows = analysis.getNumRows();
                log.debug("Publishing segment schema. SegmentId [%s], RowSignature [%s], numRows [%d]", new Object[]{segmentId, rowSignature, numRows});
                Map aggregators = analysis.getAggregators();
                SchemaPayloadPlus schema = new SchemaPayloadPlus(new SchemaPayload(rowSignature, aggregators), Long.valueOf(numRows));
                this.segmentSchemaCache.addSchemaPendingBackfill(segmentId, schema);
                this.segmentSchemaBackfillQueue.add(segmentId, schema);
                added.set(true);
                return segmentMetadata;
            });
            if (dataSourceSegments.isEmpty()) {
                return null;
            }
            return dataSourceSegments;
        });
        return added.get();
    }

    @Override
    public Iterator<AvailableSegmentMetadata> iterateSegmentMetadata() {
        return FluentIterable.from(this.segmentMetadataInfo.values()).transformAndConcat(Map::values).transform(availableSegmentMetadata -> {
            SegmentId segmentId = availableSegmentMetadata.getSegment().getId();
            Optional<SchemaPayloadPlus> metadata = this.segmentSchemaCache.getSchemaForSegment(segmentId);
            if (metadata.isPresent()) {
                return AvailableSegmentMetadata.from(availableSegmentMetadata).withRowSignature(metadata.get().getSchemaPayload().getRowSignature()).withNumRows(metadata.get().getNumRows()).build();
            }
            this.markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
            return availableSegmentMetadata;
        }).iterator();
    }

    @Override
    @Nullable
    public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, SegmentId segmentId) {
        ConcurrentSkipListMap segmentMap = (ConcurrentSkipListMap)this.segmentMetadataInfo.get(datasource);
        AvailableSegmentMetadata availableSegmentMetadata = null;
        if (segmentMap != null) {
            availableSegmentMetadata = (AvailableSegmentMetadata)segmentMap.get(segmentId);
        }
        if (availableSegmentMetadata == null) {
            return null;
        }
        Optional<SchemaPayloadPlus> metadata = this.segmentSchemaCache.getSchemaForSegment(segmentId);
        if (metadata.isPresent()) {
            availableSegmentMetadata = AvailableSegmentMetadata.from(availableSegmentMetadata).withRowSignature(metadata.get().getSchemaPayload().getRowSignature()).withNumRows(metadata.get().getNumRows()).build();
        } else {
            this.markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
        }
        return availableSegmentMetadata;
    }

    @Override
    public DataSourceInformation getDatasource(String name) {
        return this.getMergedDatasourceInformation((DataSourceInformation)this.tables.get(name), this.coldSchemaTable.get(name)).orElse(null);
    }

    @Override
    public Map<String, DataSourceInformation> getDataSourceInformationMap() {
        HashMap hot = new HashMap(this.tables);
        HashMap<String, DataSourceInformation> cold = new HashMap<String, DataSourceInformation>(this.coldSchemaTable);
        HashSet combinedDatasources = new HashSet(hot.keySet());
        combinedDatasources.addAll(cold.keySet());
        ImmutableMap.Builder combined = ImmutableMap.builder();
        for (String dataSource : combinedDatasources) {
            this.getMergedDatasourceInformation((DataSourceInformation)hot.get(dataSource), (DataSourceInformation)cold.get(dataSource)).ifPresent(merged -> combined.put((Object)dataSource, merged));
        }
        return combined.build();
    }

    private Optional<DataSourceInformation> getMergedDatasourceInformation(DataSourceInformation hot, DataSourceInformation cold) {
        if (hot == null) {
            return Optional.ofNullable(cold);
        }
        if (cold == null) {
            return Optional.of(hot);
        }
        LinkedHashMap<String, ColumnType> columnTypes = new LinkedHashMap<String, ColumnType>();
        ArrayList<RowSignature> signatures = new ArrayList<RowSignature>();
        signatures.add(hot.getRowSignature());
        signatures.add(cold.getRowSignature());
        for (RowSignature signature : signatures) {
            this.extractColumnTypes(columnTypes, signature);
        }
        RowSignature.Builder builder = RowSignature.builder();
        columnTypes.forEach((arg_0, arg_1) -> ((RowSignature.Builder)builder).add(arg_0, arg_1));
        return Optional.of(new DataSourceInformation(hot.getDataSource(), builder.build()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild) throws IOException {
        log.debug("Segments to refresh [%s], dataSourcesToRebuild [%s]", new Object[]{segmentsToRefresh, dataSourcesToRebuild});
        this.filterRealtimeSegments(segmentsToRefresh);
        log.debug("SegmentsToRefreshMinusRealtimeSegments [%s]", new Object[]{segmentsToRefresh});
        Set<SegmentId> cachedSegments = this.filterSegmentWithCachedSchema(segmentsToRefresh);
        log.debug("SegmentsToRefreshMinusCachedSegments [%s], cachedSegments [%s]", new Object[]{segmentsToRefresh, cachedSegments});
        Set<Object> refreshed = Collections.emptySet();
        if (!this.config.isDisableSegmentMetadataQueries()) {
            refreshed = this.refreshSegments(segmentsToRefresh);
            log.debug("Refreshed segments are [%s]", new Object[]{refreshed});
        }
        Iterator<String> iterator = this.lock;
        synchronized (iterator) {
            this.segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
            dataSourcesToRebuild.addAll(this.dataSourcesNeedingRebuild);
            refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
            cachedSegments.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
            this.dataSourcesNeedingRebuild.clear();
        }
        log.debug("Re-building schema for datasources[%s].", new Object[]{dataSourcesToRebuild});
        for (String dataSource : dataSourcesToRebuild) {
            RowSignature rowSignature = this.buildDataSourceRowSignature(dataSource);
            if (rowSignature == null) {
                log.info("Datasource[%s] no longer exists as its row signature is [null]. Removing all cached metadata.", new Object[]{dataSource});
                this.tables.remove(dataSource);
                continue;
            }
            DataSourceInformation druidTable = new DataSourceInformation(dataSource, rowSignature);
            DataSourceInformation oldTable = this.tables.put(dataSource, druidTable);
            if (oldTable == null || !oldTable.getRowSignature().equals((Object)druidTable.getRowSignature())) {
                log.info("Datasource[%s] has a new row signature[%s].", new Object[]{dataSource, druidTable.getRowSignature()});
                continue;
            }
            log.debug("Datasource[%s] row signature is unchanged.", new Object[]{dataSource});
        }
    }

    @Override
    void logSegmentsToRefresh(String dataSource, Set<SegmentId> ids) {
        log.info("Refreshing schema of [%d] segment IDs (sample=[%s]) for datasource[%s].", new Object[]{ids.size(), Iterables.limit(ids, (int)5), dataSource});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void filterRealtimeSegments(Set<SegmentId> segmentIds) {
        Object object = this.lock;
        synchronized (object) {
            segmentIds.removeAll(this.mutableSegments);
        }
    }

    private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> segmentIds) {
        HashSet<SegmentId> cachedSegments = new HashSet<SegmentId>();
        for (SegmentId id : segmentIds) {
            if (!this.segmentSchemaCache.isSchemaCached(id)) continue;
            cachedSegments.add(id);
        }
        segmentIds.removeAll(cachedSegments);
        return cachedSegments;
    }

    @Nullable
    private Integer getReplicationFactor(SegmentId segmentId) {
        if (this.segmentReplicationStatus == null) {
            return null;
        }
        SegmentReplicaCount replicaCountsInCluster = this.segmentReplicationStatus.getReplicaCountsInCluster(segmentId);
        return replicaCountsInCluster == null ? null : Integer.valueOf(replicaCountsInCluster.required());
    }

    @VisibleForTesting
    protected void refreshColdSegmentSchemas() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        int totalColdSegments = 0;
        HashSet<String> dataSourcesWithColdSegments = new HashSet<String>();
        DataSourcesSnapshot snapshot = this.segmentsMetadataManager.getRecentDataSourcesSnapshot();
        for (ImmutableDruidDataSource dataSource : snapshot.getDataSourcesWithAllUsedSegments()) {
            LinkedHashMap<String, ColumnType> columnTypes = new LinkedHashMap<String, ColumnType>();
            int coldSegments = 0;
            int coldSegmentsWithSchema = 0;
            for (DataSegment segment : dataSource.getSegments()) {
                Integer replicationFactor = this.getReplicationFactor(segment.getId());
                if (replicationFactor != null && replicationFactor != 0) continue;
                Optional<SchemaPayloadPlus> optionalSchema = this.segmentSchemaCache.getSchemaForSegment(segment.getId());
                if (optionalSchema.isPresent()) {
                    RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
                    this.extractColumnTypes(columnTypes, rowSignature);
                    ++coldSegmentsWithSchema;
                }
                ++coldSegments;
            }
            if (coldSegments == 0) continue;
            totalColdSegments += coldSegments;
            String dataSourceName = dataSource.getName();
            dataSourcesWithColdSegments.add(dataSourceName);
            ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder().setDimension("dataSource", (Object)dataSourceName);
            this.emitMetric("segment/used/deepStorageOnly/count", coldSegments, metricBuilder);
            if (columnTypes.isEmpty()) continue;
            RowSignature.Builder builder = RowSignature.builder();
            columnTypes.forEach((arg_0, arg_1) -> ((RowSignature.Builder)builder).add(arg_0, arg_1));
            RowSignature coldSignature = builder.build();
            DataSourceInformation druidTable = new DataSourceInformation(dataSourceName, coldSignature);
            DataSourceInformation oldTable = this.coldSchemaTable.put(dataSourceName, druidTable);
            if (oldTable == null || !oldTable.getRowSignature().equals((Object)druidTable.getRowSignature())) {
                log.info("Datasource[%s] has new cold row signature[%s].", new Object[]{dataSourceName, druidTable.getRowSignature()});
            } else {
                log.debug("Row signature for datasource[%s] is unchanged.", new Object[]{dataSourceName});
            }
            this.emitMetric("segment/schemaCache/deepStorageOnly/count", coldSegmentsWithSchema, metricBuilder);
            log.debug("Built row signature[%s] from cold segments for datasource[%s].", new Object[]{coldSignature, dataSourceName});
        }
        ((ConcurrentHashMap.CollectionView)((Object)this.coldSchemaTable.keySet())).retainAll(dataSourcesWithColdSegments);
        this.emitMetric("segment/schemaCache/deepStorageOnly/refresh/time", stopwatch.millisElapsed());
        int numDatasources = snapshot.getDataSourcesMap().size();
        String executionStatsLog = StringUtils.format((String)"Cold schema processing took [%d] millis. Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segment schema.", (Object[])new Object[]{stopwatch.millisElapsed(), numDatasources, totalColdSegments, dataSourcesWithColdSegments.size()});
        if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) {
            log.info(executionStatsLog, new Object[0]);
        } else {
            log.debug(executionStatsLog, new Object[0]);
        }
    }

    private void extractColumnTypes(Map<String, ColumnType> columnTypes, RowSignature signature) {
        for (String column : signature.getColumnNames()) {
            ColumnType columnType = (ColumnType)signature.getColumnType(column).orElseThrow(() -> new ISE("Encountered null type for column[%s]", new Object[]{column}));
            columnTypes.compute(column, (c, existingType) -> this.columnTypeMergePolicy.merge((ColumnType)existingType, columnType));
        }
    }

    @Override
    @Nullable
    @VisibleForTesting
    public RowSignature buildDataSourceRowSignature(String dataSource) {
        ConcurrentSkipListMap segmentsMap = (ConcurrentSkipListMap)this.segmentMetadataInfo.get(dataSource);
        LinkedHashMap<String, ColumnType> columnTypes = new LinkedHashMap<String, ColumnType>();
        if (segmentsMap != null && !segmentsMap.isEmpty()) {
            for (Map.Entry entry : segmentsMap.entrySet()) {
                SegmentId segmentId = (SegmentId)entry.getKey();
                Optional<SchemaPayloadPlus> optionalSchema = this.segmentSchemaCache.getSchemaForSegment(segmentId);
                if (optionalSchema.isPresent()) {
                    RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
                    this.extractColumnTypes(columnTypes, rowSignature);
                    continue;
                }
                this.markSegmentForRefreshIfNeeded(((AvailableSegmentMetadata)entry.getValue()).getSegment());
            }
        } else {
            return null;
        }
        RowSignature.Builder builder = RowSignature.builder();
        columnTypes.forEach((arg_0, arg_1) -> ((RowSignature.Builder)builder).add(arg_0, arg_1));
        return builder.build();
    }

    @VisibleForTesting
    void updateSchemaForRealtimeSegments(SegmentSchemas segmentSchemas) {
        log.debug("SchemaUpdate for realtime segments [%s].", new Object[]{segmentSchemas});
        List<SegmentSchemas.SegmentSchema> segmentSchemaList = segmentSchemas.getSegmentSchemaList();
        for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
            String dataSource = segmentSchema.getDataSource();
            SegmentId segmentId = SegmentId.tryParse((String)dataSource, (String)segmentSchema.getSegmentId());
            if (segmentId == null) {
                log.error("Could not apply schema update. Failed parsing segmentId [%s]", new Object[]{segmentSchema.getSegmentId()});
                continue;
            }
            log.debug("Applying schema update for segmentId [%s] datasource [%s]", new Object[]{segmentId, dataSource});
            this.segmentMetadataInfo.compute(dataSource, (dataSourceKey, segmentsMap) -> {
                if (segmentsMap == null) {
                    log.warn("No segment map found with datasource [%s], skipping refresh of segment [%s]", new Object[]{dataSourceKey, segmentId});
                    return null;
                }
                segmentsMap.compute(segmentId, (id, segmentMetadata) -> {
                    if (segmentMetadata == null) {
                        log.makeAlert("Schema update [%s] for unknown segment [%s]", new Object[]{segmentSchema, segmentId}).emit();
                    } else {
                        Optional<SchemaPayloadPlus> schemaMetadata = this.segmentSchemaCache.getSchemaForSegment(segmentId);
                        Optional<RowSignature> rowSignature = this.mergeOrCreateRowSignature(segmentId, schemaMetadata.map(segmentSchemaMetadata -> segmentSchemaMetadata.getSchemaPayload().getRowSignature()).orElse(null), segmentSchema);
                        if (rowSignature.isPresent()) {
                            log.debug("Segment [%s] signature [%s] after applying schema update.", new Object[]{segmentId, rowSignature.get()});
                            Long numRows = segmentSchema.getNumRows() == null ? null : Long.valueOf(segmentSchema.getNumRows().longValue());
                            this.segmentSchemaCache.addRealtimeSegmentSchema(segmentId, new SchemaPayloadPlus(new SchemaPayload(rowSignature.get()), numRows));
                            this.markDataSourceAsNeedRebuild(dataSource);
                        }
                    }
                    return segmentMetadata;
                });
                return segmentsMap;
            });
        }
    }

    @VisibleForTesting
    Optional<RowSignature> mergeOrCreateRowSignature(SegmentId segmentId, @Nullable RowSignature existingSignature, SegmentSchemas.SegmentSchema segmentSchema) {
        if (!segmentSchema.isDelta()) {
            RowSignature.Builder builder = RowSignature.builder();
            Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
            for (String column : segmentSchema.getNewColumns()) {
                builder.add(column, columnMapping.get(column));
            }
            return Optional.of((RowSignature)ROW_SIGNATURE_INTERNER.intern((Object)builder.build()));
        }
        if (existingSignature != null) {
            RowSignature.Builder builder = RowSignature.builder();
            LinkedHashMap<String, ColumnType> mergedColumnTypes = new LinkedHashMap<String, ColumnType>();
            for (String column : existingSignature.getColumnNames()) {
                ColumnType columnType = (ColumnType)existingSignature.getColumnType(column).orElseThrow(() -> new ISE("Encountered null type for column [%s]", new Object[]{column}));
                mergedColumnTypes.put(column, columnType);
            }
            Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
            HashSet<String> missingUpdateColumns = new HashSet<String>();
            for (String column : segmentSchema.getUpdatedColumns()) {
                if (!mergedColumnTypes.containsKey(column)) {
                    missingUpdateColumns.add(column);
                    mergedColumnTypes.put(column, columnMapping.get(column));
                    continue;
                }
                mergedColumnTypes.compute(column, (c, existingType) -> this.columnTypeMergePolicy.merge((ColumnType)existingType, (ColumnType)columnMapping.get(column)));
            }
            for (String column : segmentSchema.getNewColumns()) {
                if (mergedColumnTypes.containsKey(column)) {
                    mergedColumnTypes.compute(column, (c, existingType) -> this.columnTypeMergePolicy.merge((ColumnType)existingType, (ColumnType)columnMapping.get(column)));
                    continue;
                }
                mergedColumnTypes.put(column, columnMapping.get(column));
            }
            if (!missingUpdateColumns.isEmpty()) {
                log.makeAlert("Datasource schema mismatch detected. The delta realtime segment schema contains columns that are not defined in the datasource schema. This indicates a potential issue with schema updates on the Coordinator. Please review relevant Coordinator metrics and logs for task communication to identify any issues.", new Object[0]).addData("datasource", (Object)segmentId.getDataSource()).addData("existingSignature", (Object)existingSignature).addData("deltaSchema", (Object)segmentSchema).addData("missingUpdateColumns", missingUpdateColumns).emit();
            }
            mergedColumnTypes.forEach((arg_0, arg_1) -> ((RowSignature.Builder)builder).add(arg_0, arg_1));
            return Optional.of((RowSignature)ROW_SIGNATURE_INTERNER.intern((Object)builder.build()));
        }
        log.makeAlert("Received delta schema update [%s] for a segment [%s] with no previous schema. ", new Object[]{segmentSchema, segmentId}).emit();
        return Optional.empty();
    }

    private void markSegmentForRefreshIfNeeded(DataSegment segment) {
        SegmentId id = segment.getId();
        log.debug("SchemaMetadata for segmentId [%s] is absent.", new Object[]{id});
        if (segment.isTombstone()) {
            log.debug("Skipping refresh for tombstone segment [%s].", new Object[]{id});
            return;
        }
        ImmutableDruidDataSource druidDataSource = this.segmentsMetadataManager.getRecentDataSourcesSnapshot().getDataSource(segment.getDataSource());
        if (druidDataSource != null && druidDataSource.getSegment(id) != null) {
            this.markSegmentAsNeedRefresh(id);
        } else {
            log.debug("Skipping refresh for unused segment [%s].", new Object[]{id});
        }
    }
}

