/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.wal;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.wal.DefaultLogSupervisor;
import org.apache.camel.component.wal.EntryInfo;
import org.apache.camel.component.wal.LogEntry;
import org.apache.camel.component.wal.LogReader;
import org.apache.camel.component.wal.LogWriter;
import org.apache.camel.component.wal.PersistedLogEntry;
import org.apache.camel.component.wal.WriteAheadResumeStrategyConfiguration;
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.resume.ResumeStrategyConfiguration;
import org.apache.camel.spi.annotations.JdkService;
import org.apache.camel.support.resume.OffsetKeys;
import org.apache.camel.support.resume.Offsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JdkService(value="write-ahead-resume-strategy")
public class WriteAheadResumeStrategy
implements ResumeStrategy,
CamelContextAware {
    private static final Logger LOG = LoggerFactory.getLogger(WriteAheadResumeStrategy.class);
    private File logFile;
    private LogWriter logWriter;
    private ResumeStrategy resumeStrategy;
    private WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration;
    private CamelContext camelContext;

    public WriteAheadResumeStrategy() {
    }

    public WriteAheadResumeStrategy(WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration) {
        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
    }

    public void setAdapter(ResumeAdapter adapter) {
        this.resumeStrategy.setAdapter(adapter);
    }

    public ResumeAdapter getAdapter() {
        return this.resumeStrategy.getAdapter();
    }

    public <T extends Resumable> void updateLastOffset(T offset) throws Exception {
        this.updateLastOffset(offset, null);
    }

    public <T extends Resumable> void updateLastOffset(T offset, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        OffsetKey offsetKey = offset.getOffsetKey();
        Offset offsetValue = offset.getLastOffset();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Updating offset on Kafka with key {} to {}", offsetKey.getValue(), offsetValue.getValue());
        }
        this.updateLastOffset(offsetKey, offsetValue, updateCallBack);
    }

    private void handleResult(EntryInfo.CachedEntryInfo entryInfo, Throwable t) {
        try {
            if (t == null) {
                this.logWriter.updateState(entryInfo, LogEntry.EntryState.PROCESSED);
            } else {
                this.logWriter.updateState(entryInfo, LogEntry.EntryState.FAILED);
            }
        }
        catch (IOException e) {
            if (t == null) {
                LOG.error("Unable to update state: {}", (Object)e.getMessage(), (Object)e);
            }
            LOG.error("Unable to mark the record as failed: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    private void handleResult(PersistedLogEntry entry, Throwable t) {
        try {
            if (t == null) {
                this.logWriter.updateState(entry, LogEntry.EntryState.PROCESSED);
            } else {
                this.logWriter.updateState(entry, LogEntry.EntryState.FAILED);
            }
        }
        catch (IOException e) {
            if (t == null) {
                LOG.error("Unable to update state: {}", (Object)e.getMessage(), (Object)e);
            }
            LOG.error("Unable to mark the record as failed: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offsetValue) throws Exception {
        this.updateLastOffset(offsetKey, offsetValue, null);
    }

    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offsetValue, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        EntryInfo.CachedEntryInfo entryInfo;
        ByteBuffer keyBuffer = offsetKey.serialize();
        ByteBuffer valueBuffer = offsetValue.serialize();
        try {
            LogEntry entry = new LogEntry(LogEntry.EntryState.NEW, 0, keyBuffer.array(), 0, valueBuffer.array());
            entryInfo = this.logWriter.append(entry);
        }
        catch (IOException e) {
            LOG.error("Unable to append a new record to the transaction log. The system will try to update the record on the delegate strategy before forcing the failure");
            this.tryUpdateDelegate(offsetKey, offsetValue, (EntryInfo.CachedEntryInfo)null, updateCallBack);
            throw e;
        }
        this.tryUpdateDelegate(offsetKey, offsetValue, entryInfo, updateCallBack);
    }

    private void tryUpdateDelegate(OffsetKey<?> offsetKey, Offset<?> offsetValue, EntryInfo.CachedEntryInfo entryInfo, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        try {
            ResumeStrategy.UpdateCallBack delegateCallback = this.resolveUpdateCallBack(entryInfo, updateCallBack);
            this.resumeStrategy.updateLastOffset(offsetKey, offsetValue, delegateCallback);
        }
        catch (Throwable throwable) {
            if (entryInfo != null) {
                this.logWriter.updateState(entryInfo, LogEntry.EntryState.FAILED);
            } else {
                LOG.warn("Not updating the state on the transaction log before there's no entry information: it's likely that a previous attempt to append the record has failed and the system is now in error");
            }
            throw throwable;
        }
    }

    private void tryUpdateDelegate(OffsetKey<?> offsetKey, Offset<?> offsetValue, PersistedLogEntry entry, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        try {
            ResumeStrategy.UpdateCallBack delegateCallback = this.resolveUpdateCallBack(entry, updateCallBack);
            this.resumeStrategy.updateLastOffset(offsetKey, offsetValue, delegateCallback);
        }
        catch (Throwable throwable) {
            this.logWriter.updateState(entry, LogEntry.EntryState.FAILED);
            throw throwable;
        }
    }

    private ResumeStrategy.UpdateCallBack resolveUpdateCallBack(EntryInfo.CachedEntryInfo entryInfo, ResumeStrategy.UpdateCallBack updateCallBack) {
        if (updateCallBack == null) {
            return t -> this.handleResult(entryInfo, t);
        }
        return new DelegateCallback(updateCallBack, t -> this.handleResult(entryInfo, t));
    }

    private ResumeStrategy.UpdateCallBack resolveUpdateCallBack(PersistedLogEntry entry, ResumeStrategy.UpdateCallBack updateCallBack) {
        if (updateCallBack == null) {
            return t -> this.handleResult(entry, t);
        }
        return new DelegateCallback(updateCallBack, t -> this.handleResult(entry, t));
    }

    public void loadCache() throws Exception {
        LOG.debug("Loading cache for the delegate strategy");
        this.resumeStrategy.loadCache();
        LOG.debug("Done loading cache for the delegate strategy");
        try (LogReader reader = new LogReader(this.logFile);){
            PersistedLogEntry logEntry;
            int updatedCount = 0;
            LOG.trace("Starting to read log entries");
            do {
                ResumeAdapter adapter;
                LogEntry.EntryState entryState;
                if ((logEntry = reader.readEntry()) == null || (entryState = logEntry.getEntryState()) != LogEntry.EntryState.NEW && entryState != LogEntry.EntryState.FAILED || !((adapter = this.resumeStrategy.getAdapter()) instanceof Deserializable)) continue;
                Deserializable deserializable = (Deserializable)adapter;
                Object oKey = deserializable.deserializeKey(ByteBuffer.wrap(logEntry.getKey()));
                Object value = deserializable.deserializeValue(ByteBuffer.wrap(logEntry.getValue()));
                this.tryUpdateDelegate(OffsetKeys.of((Object)oKey), Offsets.of((Object)value), logEntry, null);
                ++updatedCount;
            } while (logEntry != null);
            LOG.trace("Finished reading log entries");
            if (updatedCount == 0) {
                this.logWriter.reset();
            }
        }
    }

    public void start() {
        try {
            this.logFile = this.resumeStrategyConfiguration.getLogFile();
            this.resumeStrategy = this.resumeStrategyConfiguration.getDelegateResumeStrategy();
            ScheduledExecutorService executorService = this.camelContext.getExecutorServiceManager().newScheduledThreadPool((Object)this, "SingleNodeKafkaResumeStrategy", 1);
            DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(this.resumeStrategyConfiguration.getSupervisorInterval(), executorService);
            this.logWriter = new LogWriter(this.logFile, flushPolicy);
        }
        catch (Exception e) {
            throw new RuntimeCamelException((Throwable)e);
        }
        this.resumeStrategy.start();
    }

    public void stop() {
        LOG.trace("Stopping the delegate strategy");
        this.resumeStrategy.stop();
        LOG.trace("Done stopping the delegate strategy");
        LOG.trace("Closing the writer");
        this.logWriter.close();
        LOG.trace("Writer is closed");
    }

    public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
        this.resumeStrategyConfiguration = (WriteAheadResumeStrategyConfiguration)resumeStrategyConfiguration;
    }

    public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
        return this.resumeStrategyConfiguration;
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    private static class DelegateCallback
    implements ResumeStrategy.UpdateCallBack {
        private final ResumeStrategy.UpdateCallBack updateCallBack;
        private final ResumeStrategy.UpdateCallBack flushCallBack;

        public DelegateCallback(ResumeStrategy.UpdateCallBack updateCallBack, ResumeStrategy.UpdateCallBack flushCallBack) {
            this.updateCallBack = updateCallBack;
            this.flushCallBack = flushCallBack;
        }

        public void onUpdate(Throwable throwable) {
            this.flushCallBack.onUpdate(throwable);
            this.updateCallBack.onUpdate(throwable);
        }
    }
}

