/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.extractor.realtime;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeTimePartitionListener;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionExtractor.class);
    protected String pipeName;
    protected String dataRegionId;
    protected PipeTaskMeta pipeTaskMeta;
    protected String pattern;
    private boolean isDbNameCoveredByPattern = false;
    protected long realtimeDataExtractionStartTime = Long.MIN_VALUE;
    protected long realtimeDataExtractionEndTime = Long.MAX_VALUE;
    private final AtomicBoolean enableSkippingTimeParseByTimePartition = new AtomicBoolean(false);
    private boolean disableSkippingTimeParse = false;
    private long startTimePartitionIdLowerBound;
    private long endTimePartitionIdUpperBound;
    private final AtomicReference<Pair<Long, Long>> dataRegionTimePartitionIdBound = new AtomicReference();
    protected boolean isForwardingPipeRequests;
    protected final UnboundedBlockingPendingQueue<Event> pendingQueue = new UnboundedBlockingPendingQueue((PipeEventCounter)new PipeDataRegionEventCounter());
    protected final AtomicBoolean isClosed = new AtomicBoolean(false);
    private String taskID;

    protected PipeRealtimeDataRegionExtractor() {
    }

    public void validate(PipeParameterValidator validator) throws Exception {
        PipeParameters parameters = validator.getParameters();
        try {
            this.realtimeDataExtractionStartTime = parameters.hasAnyAttributes(new String[]{"source.start-time"}) ? DateTimeUtils.convertDatetimeStrToLong(parameters.getStringByKeys(new String[]{"source.start-time"}), ZoneId.systemDefault()) : Long.MIN_VALUE;
            long l = this.realtimeDataExtractionEndTime = parameters.hasAnyAttributes(new String[]{"source.end-time"}) ? DateTimeUtils.convertDatetimeStrToLong(parameters.getStringByKeys(new String[]{"source.end-time"}), ZoneId.systemDefault()) : Long.MAX_VALUE;
            if (this.realtimeDataExtractionStartTime > this.realtimeDataExtractionEndTime) {
                throw new PipeParameterNotValidException(String.format("%s should be less than or equal to %s.", "source.start-time", "source.end-time"));
            }
        }
        catch (Exception e) {
            throw new PipeParameterNotValidException(e.getMessage());
        }
    }

    public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration) throws Exception {
        String databaseName;
        PipeTaskExtractorRuntimeEnvironment environment = (PipeTaskExtractorRuntimeEnvironment)configuration.getRuntimeEnvironment();
        this.pipeName = environment.getPipeName();
        this.dataRegionId = String.valueOf(environment.getRegionId());
        this.pipeTaskMeta = environment.getPipeTaskMeta();
        long creationTime = environment.getCreationTime();
        this.taskID = this.pipeName + "_" + this.dataRegionId + "_" + creationTime;
        this.pattern = parameters.getStringOrDefault(Arrays.asList("extractor.pattern", "source.pattern"), "root");
        DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(environment.getRegionId()));
        if (dataRegion != null && (databaseName = dataRegion.getDatabaseName()) != null && this.pattern.length() <= databaseName.length() && databaseName.startsWith(this.pattern)) {
            this.isDbNameCoveredByPattern = true;
        }
        this.startTimePartitionIdLowerBound = this.realtimeDataExtractionStartTime % TimePartitionUtils.getTimePartitionInterval() == 0L ? TimePartitionUtils.getTimePartitionId((long)this.realtimeDataExtractionStartTime) : TimePartitionUtils.getTimePartitionId((long)this.realtimeDataExtractionStartTime) + 1L;
        this.endTimePartitionIdUpperBound = this.realtimeDataExtractionEndTime % TimePartitionUtils.getTimePartitionInterval() == 0L ? TimePartitionUtils.getTimePartitionId((long)this.realtimeDataExtractionEndTime) : TimePartitionUtils.getTimePartitionId((long)this.realtimeDataExtractionEndTime) - 1L;
        Pair<Long, Long> timePartitionIdBound = TimePartitionManager.getInstance().getTimePartitionIdBound(new DataRegionId(Integer.parseInt(this.dataRegionId)));
        if (Objects.nonNull(timePartitionIdBound)) {
            this.setDataRegionTimePartitionIdBound(timePartitionIdBound);
        } else {
            LOGGER.warn("Something unexpected happened when PipeRealtimeDataRegionExtractor({}) obtaining time partition id bound on data region {}, set enableTimeParseSkipByTimePartition to false.", (Object)this.taskID, (Object)this.dataRegionId);
            this.enableSkippingTimeParseByTimePartition.set(false);
        }
        this.isForwardingPipeRequests = true;
    }

    public void start() throws Exception {
        PipeTimePartitionListener.getInstance().startListen(this.dataRegionId, this);
        PipeInsertionDataNodeListener.getInstance().startListenAndAssign(this.dataRegionId, this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws Exception {
        if (Objects.nonNull(this.dataRegionId)) {
            PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(this.dataRegionId, this);
            PipeTimePartitionListener.getInstance().stopListen(this.dataRegionId, this);
        }
        AtomicBoolean atomicBoolean = this.isClosed;
        synchronized (atomicBoolean) {
            this.clearPendingQueue();
            this.isClosed.set(true);
        }
    }

    private void clearPendingQueue() {
        ArrayList eventsToDrop = new ArrayList(this.pendingQueue.size());
        this.pendingQueue.forEach(eventsToDrop::add);
        this.pendingQueue.clear();
        eventsToDrop.forEach(event -> {
            if (event instanceof EnrichedEvent) {
                ((EnrichedEvent)event).clearReferenceCount(PipeRealtimeDataRegionExtractor.class.getName());
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void extract(PipeRealtimeEvent event) {
        if (this.isDbNameCoveredByPattern) {
            event.skipParsingPattern();
        }
        if (!this.disableSkippingTimeParse && this.enableSkippingTimeParseByTimePartition.get()) {
            if (this.isDataRegionTimePartitionCoveredByTimeRange()) {
                event.skipParsingTime();
            } else {
                this.disableSkippingTimeParse = true;
            }
        }
        if (!event.shouldParseTime() || event.getEvent().mayEventTimeOverlappedWithTimeRange()) {
            this.doExtract(event);
        } else {
            event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), false);
        }
        AtomicBoolean atomicBoolean = this.isClosed;
        synchronized (atomicBoolean) {
            if (this.isClosed.get()) {
                this.clearPendingQueue();
            }
        }
    }

    protected abstract void doExtract(PipeRealtimeEvent var1);

    public abstract boolean isNeedListenToTsFile();

    public abstract boolean isNeedListenToInsertNode();

    public final String getPattern() {
        return this.pattern;
    }

    public final long getRealtimeDataExtractionStartTime() {
        return this.realtimeDataExtractionStartTime;
    }

    public final long getRealtimeDataExtractionEndTime() {
        return this.realtimeDataExtractionEndTime;
    }

    public void setDataRegionTimePartitionIdBound(Pair<Long, Long> timePartitionIdBound) {
        LOGGER.info("PipeRealtimeDataRegionExtractor({}) observed data region {} time partition growth, recording partition id bound: {}, set enableTimeParseSkipByTimePartition to true.", new Object[]{this.taskID, this.dataRegionId, timePartitionIdBound});
        this.dataRegionTimePartitionIdBound.set(timePartitionIdBound);
        this.enableSkippingTimeParseByTimePartition.set(true);
    }

    private boolean isDataRegionTimePartitionCoveredByTimeRange() {
        Pair<Long, Long> timePartitionIdBound = this.dataRegionTimePartitionIdBound.get();
        return Objects.nonNull(timePartitionIdBound) && this.startTimePartitionIdLowerBound <= (Long)timePartitionIdBound.left && (Long)timePartitionIdBound.right <= this.endTimePartitionIdUpperBound;
    }

    public final boolean isForwardingPipeRequests() {
        return this.isForwardingPipeRequests;
    }

    public final String getPipeName() {
        return this.pipeName;
    }

    public final PipeTaskMeta getPipeTaskMeta() {
        return this.pipeTaskMeta;
    }

    public String toString() {
        return "PipeRealtimeDataRegionExtractor{pattern='" + this.pattern + '\'' + ", dataRegionId='" + this.dataRegionId + '\'' + '}';
    }

    public int getTabletInsertionEventCount() {
        return this.pendingQueue.getTabletInsertionEventCount();
    }

    public int getTsFileInsertionEventCount() {
        return this.pendingQueue.getTsFileInsertionEventCount();
    }

    public int getPipeHeartbeatEventCount() {
        return this.pendingQueue.getPipeHeartbeatEventCount();
    }

    public String getTaskID() {
        return this.taskID;
    }
}

