/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.config.SourceTaskConfig;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.fs.TaskFileURIProvider;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStateReporter;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
import io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
import io.streamthoughts.kafka.connect.filepulse.state.internal.OpaqueMemoryResource;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FilePulseSourceTask
extends SourceTask {
    private static final Logger LOG = LoggerFactory.getLogger(FilePulseSourceTask.class);
    private static final int CONSECUTIVE_WAITS_BEFORE_RETURN = 3;
    private static final String CONNECT_NAME_CONFIG = "name";
    public SourceTaskConfig taskConfig;
    private String defaultTopic;
    private DefaultFileRecordsPollingConsumer consumer;
    private SourceOffsetPolicy offsetPolicy;
    private FileObjectStateReporter reporter;
    private volatile FileObjectContext contextToBeCommitted;
    private StateBackingStoreAccess sharedStore;
    private TaskFileURIProvider fileURIProvider;
    private String connectorGroupName;
    private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
    private final AtomicBoolean isIdle = new AtomicBoolean(false);
    private final ConcurrentLinkedQueue<FileObjectContext> completedToCommit = new ConcurrentLinkedQueue();
    private final Map<String, Schema> valueSchemas = new HashMap<String, Schema>();
    private final AtomicLong taskThreadId = new AtomicLong(0L);
    private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();

    public String version() {
        return new FilePulseSourceConnector().version();
    }

    public void start(Map<String, String> props) {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                LOG.info("Connector has already been started");
                return;
            }
            LOG.info("Starting FilePulse source task");
            HashMap<String, String> configProperties = new HashMap<String, String>(props);
            this.taskConfig = new SourceTaskConfig(configProperties);
            this.connectorGroupName = props.get(CONNECT_NAME_CONFIG);
            this.offsetPolicy = this.taskConfig.getSourceOffsetPolicy();
            this.defaultTopic = this.taskConfig.topic();
            this.valueSchemas.put(this.defaultTopic, this.taskConfig.getValueConnectSchema());
            this.sharedStore = new StateBackingStoreAccess(this.connectorGroupName, this.taskConfig::getStateBackingStore, true);
            this.reporter = new FileObjectStateReporter((StateBackingStore)((OpaqueMemoryResource)this.sharedStore.get()).getResource()){

                @Override
                public void onCompleted(FileObjectContext context) {
                    super.onCompleted(context);
                    FilePulseSourceTask.this.completedToCommit.add(context);
                }
            };
            this.consumer = this.newFileRecordsPollingConsumer();
            this.consumer.setStateListener(this.reporter);
            this.fileURIProvider = this.taskConfig.getFileURIProvider();
            this.taskThreadId.set(Thread.currentThread().getId());
            LOG.info("Started FilePulse source task");
        }
        catch (Throwable t) {
            this.closeResources();
            throw t;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    private DefaultFileRecordsPollingConsumer newFileRecordsPollingConsumer() {
        DefaultRecordFilterPipeline filter = new DefaultRecordFilterPipeline(this.taskConfig.filters());
        return new DefaultFileRecordsPollingConsumer(this.context, this.taskConfig.reader(), (RecordFilterPipeline<FileRecord<TypedStruct>>)filter, this.offsetPolicy, this.taskConfig.isReadCommittedFile());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<SourceRecord> poll() throws InterruptedException {
        if (this.isIdle.get()) {
            LOG.trace("Cannot poll new data. FilePulse source task is in IDLE state");
            this.busyWait();
            return null;
        }
        LOG.trace("Polling for new data");
        try {
            MaxConsecutiveAttempts consecutiveWaits = new MaxConsecutiveAttempts(3);
            this.contextToBeCommitted = this.consumer.context();
            while (this.isTaskRunning()) {
                List results = null;
                if (!this.consumer.hasNext()) {
                    this.contextToBeCommitted = null;
                    if (!this.fileURIProvider.hasMore()) {
                        LOG.info("Completed all object files. FilePulse source task is transiting to IDLE state while waiting for new reconfiguration request from source connector.");
                        this.isIdle.set(true);
                        return null;
                    }
                    this.consumer.addAll(this.fileURIProvider.nextURIs());
                    if (!this.consumer.hasNext() && consecutiveWaits.checkAndDecrement()) {
                        if (!this.isTaskRunning()) continue;
                        this.busyWait();
                        continue;
                    }
                } else {
                    try {
                        RecordsIterable<FileRecord<TypedStruct>> records = this.consumer.next();
                        if (!records.isEmpty()) {
                            FileObjectContext context = this.consumer.context();
                            LOG.debug("Returning {} records for {}", (Object)records.size(), (Object)context.metadata());
                            results = records.stream().map(r -> this.buildSourceRecord(context, (FileRecord<?>)r)).collect(Collectors.toList());
                        } else if (this.isTaskRunning() && this.consumer.hasNext() && consecutiveWaits.checkAndDecrement()) {
                            this.busyWait();
                            continue;
                        }
                    }
                    catch (ConnectFilePulseException e) {
                        if (this.taskConfig.isTaskHaltOnError()) {
                            throw e;
                        }
                        LOG.error("Caught unexpected error while processing file. Ignore and continue", (Throwable)e);
                    }
                }
                if (!this.isTaskRunning()) continue;
                return results;
            }
        }
        catch (Throwable t) {
            LOG.error("This task has failed due to uncaught error and will be stopped.");
            this.closeResources();
            throw t;
        }
        this.closeResources();
        LOG.info("Stopped FilePulse source task.");
        return null;
    }

    private boolean isTaskRunning() {
        return this.state.get() == State.RUNNING;
    }

    private void busyWait() throws InterruptedException {
        LOG.trace("Waiting {} ms to execute next poll iteration", (Object)this.taskConfig.getTaskEmptyPollWaitMs());
        Thread.sleep(this.taskConfig.getTaskEmptyPollWaitMs());
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void commit() {
        boolean locked = this.stateLock.tryLock();
        if (locked) {
            try {
                if (this.isTaskRunning() && this.contextToBeCommitted != null) {
                    this.reporter.notify(this.contextToBeCommitted, FileObjectStatus.READING);
                }
                if (this.isResourceClosed.get()) return;
                while (!this.completedToCommit.isEmpty()) {
                    FileObjectContext file = this.completedToCommit.poll();
                    LOG.info("Committed offset for file: {}", (Object)file.metadata());
                    this.safelyCommit(file);
                }
                if (!this.isIdle.get()) return;
                this.closeResources();
                return;
            }
            finally {
                this.stateLock.unlock();
            }
        } else {
            LOG.warn("Couldn't commit due to a concurrent connector shutdown or restart");
        }
    }

    private void safelyCommit(FileObjectContext committed) {
        try {
            this.reporter.notify(committed, FileObjectStatus.COMMITTED);
        }
        catch (Exception e) {
            LOG.warn("Failed to notify committed file: {}", (Object)this.context, (Object)e);
        }
    }

    private SourceRecord buildSourceRecord(FileObjectContext context, FileRecord<?> record) {
        FileObjectMeta metadata = context.metadata();
        Map sourcePartition = this.offsetPolicy.toPartitionMap(metadata);
        Map sourceOffsets = this.offsetPolicy.toOffsetMap(record.offset().toSourceOffset());
        try {
            SourceRecord result = record.toSourceRecord(sourcePartition, sourceOffsets, context.metadata(), this.defaultTopic, null, this.valueSchemas::get, new FileRecord.ConnectSchemaMapperOptions(this.taskConfig.isValueConnectSchemaMergeEnabled(), this.taskConfig.isSchemaKeepLeadingUnderscoreOnFieldName()));
            if (this.taskConfig.isValueConnectSchemaMergeEnabled()) {
                this.valueSchemas.put(result.topic(), result.valueSchema());
            }
            return result;
        }
        catch (Throwable t) {
            ConnectFilePulseException exception = new ConnectFilePulseException(String.format("Failed to convert data into Kafka Connect record at offset %s from object-file: %s'", context.offset(), context.metadata()), t);
            this.consumer.closeCurrentIterator((Exception)((Object)exception));
            throw exception;
        }
    }

    public void stop() {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.RUNNING, State.STOPPED)) {
                LOG.info("Task has already been stopped");
                return;
            }
            LOG.info("Stopping FilePulse source task");
            this.doStop();
        }
        finally {
            this.stateLock.unlock();
        }
    }

    private void doStop() {
        if (this.taskThreadId.longValue() == Thread.currentThread().getId()) {
            this.closeResources();
            LOG.info("Stopped FilePulse source task.");
        }
    }

    private void closeResources() {
        if (!this.isResourceClosed.compareAndSet(false, true)) {
            LOG.info("Task's resources have already been closed");
            return;
        }
        LOG.info("Closing resources FilePulse source task");
        try {
            if (this.consumer != null) {
                try {
                    this.consumer.close();
                }
                catch (Throwable t) {
                    LOG.warn("Failed to close FileRecordsPollingConsumer. Error: {}", (Object)t.getMessage());
                }
            }
            if (this.fileURIProvider != null) {
                try {
                    this.fileURIProvider.close();
                }
                catch (Exception e) {
                    LOG.warn("Failed to close FileURIProvider. Error: {}", (Object)e.getMessage());
                }
            }
        }
        finally {
            this.contextToBeCommitted = null;
            this.consumer = null;
            this.reporter = null;
            this.closeSharedStateBackingStore();
            LOG.info("Closed resources FilePulse source task");
        }
    }

    private void closeSharedStateBackingStore() {
        try {
            if (this.sharedStore != null) {
                this.sharedStore.close();
            }
        }
        catch (Exception exception) {
            LOG.error("Failed to shared StateBackingStore '{}'", (Object)this.connectorGroupName);
        }
    }

    static final class MaxConsecutiveAttempts {
        final AtomicInteger consecutiveAttempts;

        MaxConsecutiveAttempts(int maxConsecutiveAttempts) {
            if (maxConsecutiveAttempts <= 0) {
                throw new IllegalArgumentException("'maxConsecutiveAttempts' must be superior to 0");
            }
            this.consecutiveAttempts = new AtomicInteger(maxConsecutiveAttempts);
        }

        public boolean checkAndDecrement() {
            if (this.getRemaining() < 0) {
                throw new IllegalStateException("cannot make a new consecutive attempt (remaining=0)");
            }
            return this.consecutiveAttempts.getAndDecrement() > 0;
        }

        int getRemaining() {
            return this.consecutiveAttempts.get();
        }
    }

    protected static enum State {
        RUNNING,
        STOPPED;

    }
}

