/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SortOrder;
import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SegmentMetadata;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.skife.jdbi.v2.BaseResultSetMapper;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

@ManageLifecycle
public class SqlSegmentsMetadataManager
implements SegmentsMetadataManager {
    private static final EmittingLogger log = new EmittingLogger(SqlSegmentsMetadataManager.class);
    private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock();
    private final Object pollLock = new Object();
    private final ObjectMapper jsonMapper;
    private final ObjectReader segmentReader;
    private final Duration periodicPollDelay;
    private final Supplier<MetadataStorageTablesConfig> dbTables;
    private final SQLMetadataConnector connector;
    private final SegmentSchemaCache segmentSchemaCache;
    private final ServiceEmitter serviceEmitter;
    private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
    private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = null;
    @Nullable
    private volatile DatabasePoll latestDatabasePoll = null;
    @Nullable
    @GuardedBy(value="startStopPollLock")
    private Future<?> periodicPollTaskFuture = null;
    @GuardedBy(value="startStopPollLock")
    private long startPollingCount = 0L;
    @GuardedBy(value="startStopPollLock")
    private long currentStartPollingOrder = -1L;
    @Nullable
    @GuardedBy(value="startStopPollLock")
    private ScheduledExecutorService exec = null;
    private Future<?> usedFlagLastUpdatedPopulationFuture;

    @Inject
    public SqlSegmentsMetadataManager(ObjectMapper jsonMapper, Supplier<SegmentsMetadataManagerConfig> config, Supplier<MetadataStorageTablesConfig> dbTables, SQLMetadataConnector connector, SegmentSchemaCache segmentSchemaCache, CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig, ServiceEmitter serviceEmitter) {
        this.jsonMapper = jsonMapper;
        this.segmentReader = jsonMapper.readerFor(DataSegment.class);
        this.periodicPollDelay = ((SegmentsMetadataManagerConfig)config.get()).getPollDuration().toStandardDuration();
        this.dbTables = dbTables;
        this.connector = connector;
        this.segmentSchemaCache = segmentSchemaCache;
        this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
        this.serviceEmitter = serviceEmitter;
    }

    @LifecycleStart
    public void start() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            if (this.exec != null) {
                return;
            }
            this.exec = Execs.scheduledSingleThreaded((String)(StringUtils.encodeForFormat((String)this.getClass().getName()) + "-Exec--%d"));
        }
        finally {
            lock.unlock();
        }
    }

    @LifecycleStop
    public void stop() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            this.exec.shutdownNow();
            this.exec = null;
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startPollingDatabasePeriodically() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            if (this.exec == null) {
                throw new IllegalStateException(this.getClass().getName() + " is not started");
            }
            if (this.isPollingDatabasePeriodically()) {
                return;
            }
            PeriodicDatabasePoll periodicDatabasePoll = new PeriodicDatabasePoll();
            this.latestDatabasePoll = periodicDatabasePoll;
            ++this.startPollingCount;
            long localStartOrder = this.currentStartPollingOrder = this.startPollingCount;
            this.periodicPollTaskFuture = this.exec.scheduleWithFixedDelay(this.createPollTaskForStartOrder(localStartOrder, periodicDatabasePoll), 0L, this.periodicPollDelay.getMillis(), TimeUnit.MILLISECONDS);
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void stopAsyncUsedFlagLastUpdatedUpdate() {
        if (!this.usedFlagLastUpdatedPopulationFuture.isDone() && !this.usedFlagLastUpdatedPopulationFuture.isCancelled()) {
            this.usedFlagLastUpdatedPopulationFuture.cancel(true);
        }
    }

    @Override
    public void populateUsedFlagLastUpdatedAsync() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        this.usedFlagLastUpdatedPopulationFuture = executorService.submit(this::populateUsedFlagLastUpdated);
    }

    @VisibleForTesting
    void populateUsedFlagLastUpdated() {
        String segmentsTable = this.getSegmentsTable();
        log.info("Populating column 'used_status_last_updated' with non-NULL values for unused segments in table[%s].", new Object[]{segmentsTable});
        int batchSize = 100;
        int totalUpdatedEntries = 0;
        while (true) {
            int numUpdatedRows;
            ArrayList segmentsToUpdate = new ArrayList(100);
            try {
                this.connector.retryWithHandle(handle -> {
                    segmentsToUpdate.addAll(((Query)handle.createQuery(StringUtils.format((String)"SELECT id FROM %1$s WHERE used_status_last_updated IS NULL and used = :used %2$s", (Object[])new Object[]{segmentsTable, this.connector.limitClause(100)})).bind("used", false)).mapTo(String.class).list());
                    return null;
                });
                if (segmentsToUpdate.isEmpty()) break;
                numUpdatedRows = (Integer)this.connector.retryWithHandle(handle -> {
                    Batch updateBatch = handle.createBatch();
                    String sql = "UPDATE %1$s SET used_status_last_updated = '%2$s' WHERE id = '%3$s'";
                    String now = DateTimes.nowUtc().toString();
                    for (String id : segmentsToUpdate) {
                        updateBatch.add(StringUtils.format((String)"UPDATE %1$s SET used_status_last_updated = '%2$s' WHERE id = '%3$s'", (Object[])new Object[]{segmentsTable, now, id}));
                    }
                    int[] results = updateBatch.execute();
                    return Arrays.stream(results).sum();
                });
                totalUpdatedEntries += numUpdatedRows;
            }
            catch (Exception e) {
                log.warn((Throwable)e, "Populating column 'used_status_last_updated' in table[%s] has failed. There may be unused segments with NULL values for 'used_status_last_updated' that won't be killed!", new Object[]{segmentsTable});
                return;
            }
            log.debug("Updated a batch of [%d] rows in table[%s] with a valid used_status_last_updated date", new Object[]{segmentsToUpdate.size(), segmentsTable});
            if (segmentsToUpdate.size() == numUpdatedRows && numUpdatedRows < 100) break;
            try {
                Thread.sleep(10000L);
            }
            catch (InterruptedException e) {
                log.info("Interrupted, exiting!", new Object[0]);
                Thread.currentThread().interrupt();
            }
        }
        log.info("Populated column 'used_status_last_updated' in table[%s] in [%d] rows.", new Object[]{segmentsTable, totalUpdatedEntries});
    }

    private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePoll periodicDatabasePoll) {
        return () -> {
            try {
                long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(this.periodicPollDelay.getMillis());
                while (this.latestDatabasePoll != null && this.latestDatabasePoll instanceof OnDemandDatabasePoll && ((OnDemandDatabasePoll)this.latestDatabasePoll).nanosElapsedFromInitiation() < periodicPollDelayNanos) {
                    long sleepNano = periodicPollDelayNanos - ((OnDemandDatabasePoll)this.latestDatabasePoll).nanosElapsedFromInitiation();
                    TimeUnit.NANOSECONDS.sleep(sleepNano);
                }
            }
            catch (Exception e) {
                log.debug((Throwable)e, "Exception found while waiting for next periodic poll", new Object[0]);
            }
            ReentrantReadWriteLock.ReadLock lock = this.startStopPollLock.readLock();
            lock.lock();
            try {
                if (startOrder == this.currentStartPollingOrder) {
                    periodicDatabasePoll.lastPollStartTimestampInMs = System.currentTimeMillis();
                    this.poll();
                    periodicDatabasePoll.firstPollCompletionFuture.complete(null);
                    this.latestDatabasePoll = periodicDatabasePoll;
                } else {
                    log.debug("startOrder = currentStartPollingOrder = %d, skipping poll()", new Object[]{startOrder});
                }
            }
            catch (Throwable t) {
                log.makeAlert(t, "Uncaught exception in %s's polling thread", new Object[]{SqlSegmentsMetadataManager.class}).emit();
                if (!(t instanceof Exception)) {
                    periodicDatabasePoll.firstPollCompletionFuture.completeExceptionally(t);
                    throw t;
                }
            }
            finally {
                lock.unlock();
            }
        };
    }

    @Override
    public boolean isPollingDatabasePeriodically() {
        ReentrantReadWriteLock.ReadLock lock = this.startStopPollLock.readLock();
        lock.lock();
        try {
            boolean bl = this.currentStartPollingOrder >= 0L;
            return bl;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void stopPollingDatabasePeriodically() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            if (!this.isPollingDatabasePeriodically()) {
                return;
            }
            this.periodicPollTaskFuture.cancel(false);
            this.latestDatabasePoll = null;
            this.currentStartPollingOrder = -1L;
        }
        finally {
            lock.unlock();
        }
    }

    private void useLatestIfWithinDelayOrPerformNewDatabasePoll() {
        if (this.useLatestSnapshotIfWithinDelay()) {
            return;
        }
        ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
        lock.lock();
        try {
            if (this.useLatestSnapshotIfWithinDelay()) {
                return;
            }
            OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
            this.latestDatabasePoll = onDemandDatabasePoll;
            this.doOnDemandPoll(onDemandDatabasePoll);
        }
        finally {
            lock.unlock();
        }
    }

    @VisibleForTesting
    boolean useLatestSnapshotIfWithinDelay() {
        DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
        if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
            Futures.getUnchecked(((PeriodicDatabasePoll)latestDatabasePoll).firstPollCompletionFuture);
            return true;
        }
        if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
            boolean latestDatabasePollIsFresh;
            long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(this.periodicPollDelay.getMillis());
            OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll)latestDatabasePoll;
            boolean bl = latestDatabasePollIsFresh = latestOnDemandPoll.nanosElapsedFromInitiation() < periodicPollDelayNanos;
            if (latestDatabasePollIsFresh) {
                Futures.getUnchecked(latestOnDemandPoll.pollCompletionFuture);
                return true;
            }
        } else assert (latestDatabasePoll == null);
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    void forceOrWaitOngoingDatabasePoll() {
        block8: {
            long checkStartTime = System.currentTimeMillis();
            ReentrantReadWriteLock.WriteLock lock = this.startStopPollLock.writeLock();
            lock.lock();
            DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
            try {
                if (latestDatabasePoll instanceof PeriodicDatabasePoll && ((PeriodicDatabasePoll)latestDatabasePoll).lastPollStartTimestampInMs > checkStartTime) {
                    lock.unlock();
                    return;
                }
            }
            catch (Exception e) {
                log.debug((Throwable)e, "Latest poll was unsuccessful. Starting a new poll...", new Object[0]);
                break block8;
            }
            {
                if (!(latestDatabasePoll instanceof OnDemandDatabasePoll)) break block8;
                long checkStartTimeNanos = TimeUnit.MILLISECONDS.toNanos(checkStartTime);
                OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll)latestDatabasePoll;
                if (latestOnDemandPoll.initiationTimeNanos <= checkStartTimeNanos) break block8;
                lock.unlock();
                return;
            }
        }
        OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
        this.latestDatabasePoll = onDemandDatabasePoll;
        this.doOnDemandPoll(onDemandDatabasePoll);
    }

    private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll) {
        try {
            this.poll();
            onDemandPoll.pollCompletionFuture.complete(null);
        }
        catch (Throwable t) {
            onDemandPoll.pollCompletionFuture.completeExceptionally(t);
            throw t;
        }
    }

    @Override
    public boolean markSegmentAsUsed(String segmentId) {
        try {
            int numUpdatedDatabaseEntries = (Integer)this.connector.getDBI().withHandle(handle -> ((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=true, used_status_last_updated = :used_status_last_updated WHERE id = :id", (Object[])new Object[]{this.getSegmentsTable()})).bind("id", segmentId)).bind("used_status_last_updated", DateTimes.nowUtc().toString())).execute());
            return numUpdatedDatabaseEntries > 0;
        }
        catch (RuntimeException e) {
            log.error((Throwable)e, "Exception marking segment %s as used", new Object[]{segmentId});
            throw e;
        }
    }

    @Override
    public int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource) {
        return this.doMarkAsUsedNonOvershadowedSegments(dataSource, null, null);
    }

    @Override
    public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions) {
        Preconditions.checkNotNull((Object)interval);
        return this.doMarkAsUsedNonOvershadowedSegments(dataSource, interval, versions);
    }

    private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval, @Nullable List<String> versions) {
        ArrayList<DataSegment> unusedSegments = new ArrayList<DataSegment>();
        SegmentTimeline timeline = new SegmentTimeline();
        this.connector.inReadOnlyTransaction((handle, status) -> {
            SqlSegmentsMetadataQuery queryTool = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper);
            Object intervals = interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval);
            try (CloseableIterator<DataSegment> iterator = queryTool.retrieveUsedSegments(dataSourceName, (Collection<Interval>)intervals, versions);){
                timeline.addSegments(iterator);
            }
            iterator = queryTool.retrieveUnusedSegments(dataSourceName, (Collection<Interval>)intervals, versions, null, null, null, null);
            try {
                while (iterator.hasNext()) {
                    DataSegment dataSegment = (DataSegment)iterator.next();
                    timeline.addSegments((Iterator)Iterators.singletonIterator((Object)dataSegment));
                    unusedSegments.add(dataSegment);
                }
            }
            finally {
                if (iterator != null) {
                    iterator.close();
                }
            }
            return null;
        });
        return this.markNonOvershadowedSegmentsAsUsed(unusedSegments, timeline);
    }

    private int markNonOvershadowedSegmentsAsUsed(List<DataSegment> unusedSegments, SegmentTimeline timeline) {
        ArrayList<SegmentId> segmentIdsToMarkAsUsed = new ArrayList<SegmentId>();
        for (DataSegment segment : unusedSegments) {
            if (timeline.isOvershadowed(segment)) continue;
            segmentIdsToMarkAsUsed.add(segment.getId());
        }
        return this.markSegmentsAsUsed(segmentIdsToMarkAsUsed);
    }

    @Override
    public int markAsUsedNonOvershadowedSegments(String dataSource, Set<SegmentId> segmentIds) {
        try {
            Pair unusedSegmentsAndTimeline = (Pair)this.connector.inReadOnlyTransaction((handle, status) -> {
                List<DataSegment> unusedSegments = this.retrieveUnusedSegments(dataSource, segmentIds, handle);
                List unusedSegmentsIntervals = JodaUtils.condenseIntervals((Iterable)unusedSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
                try (CloseableIterator<DataSegment> usedSegmentsOverlappingUnusedSegmentsIntervals = this.retrieveUsedSegmentsOverlappingIntervals(dataSource, unusedSegmentsIntervals, handle);){
                    SegmentTimeline timeline = SegmentTimeline.forSegments((Iterator)Iterators.concat(usedSegmentsOverlappingUnusedSegmentsIntervals, unusedSegments.iterator()));
                    Pair pair = new Pair(unusedSegments, (Object)timeline);
                    return pair;
                }
            });
            List unusedSegments = (List)unusedSegmentsAndTimeline.lhs;
            SegmentTimeline timeline = (SegmentTimeline)unusedSegmentsAndTimeline.rhs;
            return this.markNonOvershadowedSegmentsAsUsed(unusedSegments, timeline);
        }
        catch (Exception e) {
            Throwable rootCause = Throwables.getRootCause((Throwable)e);
            if (rootCause instanceof DruidException) {
                throw (DruidException)rootCause;
            }
            throw e;
        }
    }

    private List<DataSegment> retrieveUnusedSegments(String dataSource, Set<SegmentId> segmentIds, Handle handle) {
        List<DataSegmentPlus> retrievedSegments = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper).retrieveSegmentsById(dataSource, segmentIds);
        HashSet<SegmentId> unknownSegmentIds = new HashSet<SegmentId>(segmentIds);
        ArrayList<DataSegment> unusedSegments = new ArrayList<DataSegment>();
        for (DataSegmentPlus entry : retrievedSegments) {
            DataSegment segment = entry.getDataSegment();
            unknownSegmentIds.remove(segment.getId());
            if (!Boolean.FALSE.equals(entry.getUsed())) continue;
            unusedSegments.add(segment);
        }
        if (!unknownSegmentIds.isEmpty()) {
            throw InvalidInput.exception((String)"Could not find segment IDs[%s] for datasource[%s]", (Object[])new Object[]{unknownSegmentIds, dataSource});
        }
        return unusedSegments;
    }

    private CloseableIterator<DataSegment> retrieveUsedSegmentsOverlappingIntervals(String dataSource, Collection<Interval> intervals, Handle handle) {
        return SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper).retrieveUsedSegments(dataSource, intervals);
    }

    private int markSegmentsAsUsed(List<SegmentId> segmentIds) {
        if (segmentIds.isEmpty()) {
            log.info("No segments found to mark as used.", new Object[0]);
            return 0;
        }
        return (Integer)this.connector.getDBI().withHandle(handle -> SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper).markSegments(segmentIds, true, DateTimes.nowUtc()));
    }

    @Override
    public int markAsUnusedAllSegmentsInDataSource(String dataSource) {
        try {
            return (Integer)this.connector.getDBI().withHandle(handle -> SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper).markSegmentsUnused(dataSource, Intervals.ETERNITY, DateTimes.nowUtc()));
        }
        catch (RuntimeException e) {
            log.error((Throwable)e, "Exception marking all segments as unused in data source [%s]", new Object[]{dataSource});
            throw e;
        }
    }

    @Override
    public boolean markSegmentAsUnused(SegmentId segmentId) {
        try {
            int numSegments = (Integer)this.connector.getDBI().withHandle(handle -> SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper).markSegments(List.of(segmentId), false, DateTimes.nowUtc()));
            return numSegments > 0;
        }
        catch (RuntimeException e) {
            log.error((Throwable)e, "Exception marking segment [%s] as unused", new Object[]{segmentId});
            throw e;
        }
    }

    @Override
    public int markSegmentsAsUnused(Set<SegmentId> segmentIds) {
        return (Integer)this.connector.getDBI().withHandle(handle -> SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper).markSegments(segmentIds, false, DateTimes.nowUtc()));
    }

    @Override
    public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions) {
        Preconditions.checkNotNull((Object)interval);
        try {
            return (Integer)this.connector.getDBI().withHandle(handle -> SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper).markSegmentsUnused(dataSource, interval, versions, DateTimes.nowUtc()));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    @Nullable
    public ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSourceName) {
        return this.getSnapshotOfDataSourcesWithAllUsedSegments().getDataSource(dataSourceName);
    }

    @Override
    public Collection<ImmutableDruidDataSource> getImmutableDataSourcesWithAllUsedSegments() {
        return this.getSnapshotOfDataSourcesWithAllUsedSegments().getDataSourcesWithAllUsedSegments();
    }

    @Override
    public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() {
        this.useLatestIfWithinDelayOrPerformNewDatabasePoll();
        return this.dataSourcesSnapshot;
    }

    @VisibleForTesting
    DataSourcesSnapshot getDataSourcesSnapshot() {
        return this.dataSourcesSnapshot;
    }

    @VisibleForTesting
    DatabasePoll getLatestDatabasePoll() {
        return this.latestDatabasePoll;
    }

    @Override
    public Iterable<DataSegment> iterateAllUsedSegments() {
        this.useLatestIfWithinDelayOrPerformNewDatabasePoll();
        return this.dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot();
    }

    @Override
    public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, Interval interval, boolean requiresLatest) {
        if (requiresLatest) {
            this.forceOrWaitOngoingDatabasePoll();
        } else {
            this.useLatestIfWithinDelayOrPerformNewDatabasePoll();
        }
        SegmentTimeline usedSegmentsTimeline = this.dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource);
        return Optional.fromNullable((Object)usedSegmentsTimeline).transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE));
    }

    @Override
    public Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(String datasource, @Nullable Interval interval, @Nullable Integer limit, @Nullable String lastSegmentId, @Nullable SortOrder sortOrder) {
        return (Iterable)this.connector.inReadOnlyTransaction((handle, status) -> {
            SqlSegmentsMetadataQuery queryTool = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, (MetadataStorageTablesConfig)this.dbTables.get(), this.jsonMapper);
            Object intervals = interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval);
            try (CloseableIterator<DataSegmentPlus> iterator = queryTool.retrieveUnusedSegmentsPlus(datasource, (Collection<Interval>)intervals, null, limit, lastSegmentId, sortOrder, null);){
                ImmutableList immutableList = ImmutableList.copyOf(iterator);
                return immutableList;
            }
        });
    }

    @Override
    public Set<String> retrieveAllDataSourceNames() {
        return (Set)this.connector.getDBI().withHandle(handle -> (Set)handle.createQuery(StringUtils.format((String)"SELECT DISTINCT(datasource) FROM %s", (Object[])new Object[]{this.getSegmentsTable()})).fold(new HashSet(), (druidDataSources, stringObjectMap, foldController, statementContext) -> {
            druidDataSources.add(MapUtils.getString((Map)stringObjectMap, (String)"datasource"));
            return druidDataSources;
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void poll() {
        Object object = this.pollLock;
        synchronized (object) {
            this.doPoll();
        }
    }

    @GuardedBy(value="pollLock")
    private void doPoll() {
        if (this.centralizedDatasourceSchemaConfig.isEnabled()) {
            this.doPollSegmentAndSchema();
        } else {
            this.doPollSegments();
        }
    }

    private void doPollSegments() {
        DateTime startTime = DateTimes.nowUtc();
        Stopwatch stopwatch = Stopwatch.createStarted();
        List segments = (List)this.connector.inReadOnlyTransaction((handle, status) -> handle.createQuery(StringUtils.format((String)"SELECT payload FROM %s WHERE used=true", (Object[])new Object[]{this.getSegmentsTable()})).setFetchSize(this.connector.getStreamingFetchSize()).map((index, r, ctx) -> {
            try {
                DataSegment segment = (DataSegment)this.segmentReader.readValue(r.getBytes("payload"));
                return this.replaceWithExistingSegmentIfPresent(segment);
            }
            catch (IOException e) {
                log.makeAlert((Throwable)e, "Failed to read segment from db.", new Object[0]).emit();
                return null;
            }
        }).list());
        Preconditions.checkNotNull((Object)segments, (Object)"Unexpected 'null' when polling segments from the db, aborting snapshot update.");
        stopwatch.stop();
        this.emitMetric("segment/poll/time", stopwatch.millisElapsed());
        log.info("Polled and found [%,d] segments in the database in [%,d]ms.", new Object[]{segments.size(), stopwatch.millisElapsed()});
        this.createDatasourcesSnapshot(startTime, segments);
    }

    private void doPollSegmentAndSchema() {
        DateTime startTime = DateTimes.nowUtc();
        Stopwatch stopwatch = Stopwatch.createStarted();
        final ImmutableMap.Builder segmentMetadataBuilder = new ImmutableMap.Builder();
        this.segmentSchemaCache.emitStats();
        List<DataSegment> segments = this.connector.inReadOnlyTransaction(new TransactionCallback<List<DataSegment>>(){

            public List<DataSegment> inTransaction(Handle handle, TransactionStatus status) {
                return handle.createQuery(StringUtils.format((String)"SELECT payload, schema_fingerprint, num_rows FROM %s WHERE used=true", (Object[])new Object[]{SqlSegmentsMetadataManager.this.getSegmentsTable()})).setFetchSize(SqlSegmentsMetadataManager.this.connector.getStreamingFetchSize()).map((index, r, ctx) -> {
                    try {
                        DataSegment segment = (DataSegment)SqlSegmentsMetadataManager.this.jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
                        Long numRows = (Long)r.getObject("num_rows");
                        String schemaFingerprint = r.getString("schema_fingerprint");
                        if (schemaFingerprint != null && numRows != null) {
                            segmentMetadataBuilder.put((Object)segment.getId(), (Object)new SegmentMetadata(numRows, schemaFingerprint));
                        }
                        return SqlSegmentsMetadataManager.this.replaceWithExistingSegmentIfPresent(segment);
                    }
                    catch (IOException e) {
                        log.makeAlert((Throwable)e, "Failed to read segment from db.", new Object[0]).emit();
                        return null;
                    }
                }).list();
            }
        });
        ImmutableMap.Builder schemaMapBuilder = new ImmutableMap.Builder();
        String schemaPollQuery = StringUtils.format((String)"SELECT fingerprint, payload FROM %s WHERE version = %s", (Object[])new Object[]{this.getSegmentSchemaTable(), 1});
        this.connector.inReadOnlyTransaction((handle, status) -> {
            handle.createQuery(schemaPollQuery).setFetchSize(this.connector.getStreamingFetchSize()).map((index, r, ctx) -> {
                try {
                    schemaMapBuilder.put((Object)r.getString("fingerprint"), (Object)((SchemaPayload)this.jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)));
                }
                catch (IOException e) {
                    log.makeAlert((Throwable)e, "Failed to read schema from db.", new Object[0]).emit();
                }
                return null;
            }).list();
            return null;
        });
        ImmutableMap schemaMap = schemaMapBuilder.build();
        this.segmentSchemaCache.updateFinalizedSegmentSchema(new SegmentSchemaCache.FinalizedSegmentSchemaInfo((ImmutableMap<SegmentId, SegmentMetadata>)segmentMetadataBuilder.build(), (ImmutableMap<String, SchemaPayload>)schemaMap));
        Preconditions.checkNotNull(segments, (Object)"Unexpected 'null' when polling segments from the db, aborting snapshot update.");
        stopwatch.stop();
        this.emitMetric("segment/pollWithSchema/time", stopwatch.millisElapsed());
        log.info("Polled and found [%,d] segments and [%,d] schemas in the database in [%,d]ms.", new Object[]{segments.size(), schemaMap.size(), stopwatch.millisElapsed()});
        this.createDatasourcesSnapshot(startTime, segments);
    }

    private void emitMetric(String metricName, long value) {
        this.serviceEmitter.emit((ServiceEventBuilder)new ServiceMetricEvent.Builder().setMetric(metricName, (Number)value));
    }

    private void createDatasourcesSnapshot(DateTime snapshotTime, List<DataSegment> segments) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(Iterables.filter(segments, Objects::nonNull), snapshotTime);
        this.emitMetric("segment/buildSnapshot/time", stopwatch.millisElapsed());
        log.debug("Created snapshot from polled segments in [%d]ms. Found [%d] overshadowed segments.", new Object[]{stopwatch.millisElapsed(), this.dataSourcesSnapshot.getOvershadowedSegments().size()});
    }

    private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) {
        @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = this.dataSourcesSnapshot;
        if (dataSourcesSnapshot == null) {
            return segment;
        }
        ImmutableDruidDataSource dataSource = dataSourcesSnapshot.getDataSource(segment.getDataSource());
        if (dataSource == null) {
            return segment;
        }
        DataSegment alreadyExistingSegment = dataSource.getSegment(segment.getId());
        return alreadyExistingSegment != null ? alreadyExistingSegment : segment;
    }

    private String getSegmentsTable() {
        return ((MetadataStorageTablesConfig)this.dbTables.get()).getSegmentsTable();
    }

    private String getSegmentSchemaTable() {
        return ((MetadataStorageTablesConfig)this.dbTables.get()).getSegmentSchemasTable();
    }

    @Override
    public List<Interval> getUnusedSegmentIntervals(final String dataSource, final @Nullable DateTime minStartTime, final DateTime maxEndTime, final int limit, final DateTime maxUsedStatusLastUpdatedTime) {
        return this.connector.inReadOnlyTransaction(new TransactionCallback<List<Interval>>(){

            public List<Interval> inTransaction(Handle handle, TransactionStatus status) {
                Query sql = ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource AND %2$send%2$s <= :end AND used = false AND used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated %3$s ORDER BY start, %2$send%2$s", (Object[])new Object[]{SqlSegmentsMetadataManager.this.getSegmentsTable(), SqlSegmentsMetadataManager.this.connector.getQuoteString(), null != minStartTime ? "AND start >= :start" : ""})).setFetchSize(SqlSegmentsMetadataManager.this.connector.getStreamingFetchSize()).setMaxRows(limit).bind("dataSource", dataSource)).bind("end", maxEndTime.toString())).bind("used_status_last_updated", maxUsedStatusLastUpdatedTime.toString())).map((ResultSetMapper)new BaseResultSetMapper<Interval>(){

                    protected Interval mapInternal(int index, Map<String, Object> row) {
                        return new Interval((ReadableInstant)DateTimes.of((String)((String)row.get("start"))), (ReadableInstant)DateTimes.of((String)((String)row.get("end"))));
                    }
                });
                if (null != minStartTime) {
                    sql.bind("start", minStartTime.toString());
                }
                ResultIterator iter = sql.iterator();
                ArrayList result = Lists.newArrayListWithCapacity((int)limit);
                for (int i = 0; i < limit && iter.hasNext(); ++i) {
                    try {
                        result.add((Interval)iter.next());
                        continue;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                return result;
            }
        });
    }

    private static interface DatabasePoll {
    }

    @VisibleForTesting
    static class PeriodicDatabasePoll
    implements DatabasePoll {
        final CompletableFuture<Void> firstPollCompletionFuture = new CompletableFuture();
        long lastPollStartTimestampInMs = -1L;

        PeriodicDatabasePoll() {
        }
    }

    @VisibleForTesting
    static class OnDemandDatabasePoll
    implements DatabasePoll {
        final long initiationTimeNanos = System.nanoTime();
        final CompletableFuture<Void> pollCompletionFuture = new CompletableFuture();

        OnDemandDatabasePoll() {
        }

        long nanosElapsedFromInitiation() {
            return System.nanoTime() - this.initiationTimeNanos;
        }
    }
}

