/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.common;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseSourceTask
extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
    protected final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
    private ChangeEventSourceCoordinator coordinator;
    private volatile Map<String, ?> lastOffset;

    public final void start(Map<String, String> props) {
        if (this.context == null) {
            throw new ConnectException("Unexpected null context");
        }
        if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
            LOGGER.info("Connector has already been started");
            return;
        }
        Configuration config = Configuration.from(props);
        if (!config.validateAndRecord(this.getAllConfigurationFields(), arg_0 -> ((Logger)LOGGER).error(arg_0))) {
            throw new ConnectException("Error configuring an instance of " + ((Object)((Object)this)).getClass().getSimpleName() + "; check the logs for details");
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Starting {} with configuration:", (Object)((Object)((Object)this)).getClass().getSimpleName());
            config.withMaskedPasswords().forEach((propName, propValue) -> LOGGER.info("   {} = {}", propName, propValue));
        }
        this.coordinator = this.start(config);
    }

    protected abstract ChangeEventSourceCoordinator start(Configuration var1);

    public void commitRecord(SourceRecord record) throws InterruptedException {
        Map currentOffset = record.sourceOffset();
        if (currentOffset != null) {
            this.lastOffset = currentOffset;
        }
    }

    public void commit() throws InterruptedException {
        if (this.coordinator != null && this.lastOffset != null) {
            this.coordinator.commitOffset(this.lastOffset);
        }
    }

    protected abstract Iterable<Field> getAllConfigurationFields();

    protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) {
        Map<String, ?> partition = loader.getPartition();
        Map previousOffset = (Map)this.context.offsetStorageReader().offsets(Collections.singleton(partition)).get(partition);
        if (previousOffset != null) {
            OffsetContext offsetContext = loader.load(previousOffset);
            LOGGER.info("Found previous offset {}", (Object)offsetContext);
            return offsetContext;
        }
        return null;
    }

    protected static enum State {
        RUNNING,
        STOPPED;

    }
}

