/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.RecordContext;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.ProducerManager;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Function;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkContainer<K, V>
implements Comparable<WorkContainer<K, V>> {
    private static final Logger log = LoggerFactory.getLogger(WorkContainer.class);
    static final String DEFAULT_TYPE = "DEFAULT";
    @NonNull
    private final PCModule<K, V> module;
    private final long epoch;
    private String workType;
    private final ConsumerRecord<K, V> cr;
    private int numberOfFailedAttempts = 0;
    private Optional<Instant> lastFailedAt = Optional.empty();
    private Optional<Instant> succeededAt = Optional.empty();
    private Optional<Throwable> lastFailureReason;
    private boolean inFlight = false;
    private Optional<Boolean> maybeUserFunctionSucceeded = Optional.empty();
    private Future<List<?>> future;
    private Optional<Long> timeTakenAsWorkMs = Optional.empty();
    private Optional<Instant> retryDueAt = Optional.empty();
    private Comparator<WorkContainer<?, ?>> comparator = Comparator.comparing(workContainer -> {
        TopicPartition tp = workContainer.getTopicPartition();
        return tp.topic() + tp.partition();
    }).thenComparing(WorkContainer::offset);

    public WorkContainer(long epoch, ConsumerRecord<K, V> cr, @NonNull PCModule<K, V> module, @NonNull String workType) {
        if (module == null) {
            throw new NullPointerException("module is marked non-null but is null");
        }
        if (workType == null) {
            throw new NullPointerException("workType is marked non-null but is null");
        }
        this.epoch = epoch;
        this.cr = cr;
        this.workType = workType;
        this.module = module;
    }

    public WorkContainer(long epoch, ConsumerRecord<K, V> cr, PCModule<K, V> module) {
        this(epoch, cr, module, DEFAULT_TYPE);
    }

    public void endFlight() {
        log.trace("Ending flight {}", (Object)this);
        this.inFlight = false;
    }

    public boolean isDelayPassed() {
        if (!this.hasPreviouslyFailed()) {
            return true;
        }
        Duration delay = this.getDelayUntilRetryDue();
        boolean negative = delay.isNegative() || delay.isZero();
        return negative;
    }

    public Duration getDelayUntilRetryDue() {
        Instant now = this.module.clock().instant();
        Instant nextAttemptAt = this.getRetryDueAt();
        return Duration.between(now, nextAttemptAt);
    }

    public Instant getRetryDueAt() {
        return this.retryDueAt.orElse(Instant.MIN);
    }

    public Duration getRetryDelayConfig() {
        ParallelConsumerOptions<K, V> options = this.module.options();
        Function<RecordContext<K, V>, Duration> retryDelayProvider = options.getRetryDelayProvider();
        if (retryDelayProvider != null) {
            return retryDelayProvider.apply(new RecordContext(this));
        }
        return options.getDefaultMessageRetryDelay();
    }

    @Override
    public int compareTo(WorkContainer o) {
        return this.comparator.compare(this, o);
    }

    public boolean equals(Object o) {
        long thatOffset;
        int thatPartition;
        String thatTopic;
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        WorkContainer that = (WorkContainer)o;
        String thisTopic = this.getTopicPartition().topic();
        if (!thisTopic.equals(thatTopic = that.getTopicPartition().topic())) {
            return false;
        }
        int thisPartition = this.getTopicPartition().partition();
        if (thisPartition != (thatPartition = that.getTopicPartition().partition())) {
            return false;
        }
        long thisOffset = this.getCr().offset();
        return thisOffset == (thatOffset = that.getCr().offset());
    }

    public int hashCode() {
        return Objects.hash(this.getTopicPartition().topic(), this.getTopicPartition().partition(), this.cr.offset());
    }

    public boolean isNotInFlight() {
        return !this.isInFlight();
    }

    public boolean isInFlight() {
        return this.inFlight;
    }

    public void onQueueingForExecution() {
        log.trace("Queueing for execution: {}", (Object)this);
        this.inFlight = true;
        this.timeTakenAsWorkMs = Optional.of(System.currentTimeMillis());
    }

    public TopicPartition getTopicPartition() {
        return KafkaUtils.toTopicPartition(this.getCr());
    }

    public void onUserFunctionSuccess() {
        this.succeededAt = Optional.of(this.module.clock().instant());
        this.maybeUserFunctionSucceeded = Optional.of(true);
    }

    public void onUserFunctionFailure(Throwable cause) {
        log.trace("Failing {}", (Object)this);
        this.updateFailureHistory(cause);
        this.maybeUserFunctionSucceeded = Optional.of(false);
    }

    private void updateFailureHistory(Throwable cause) {
        ++this.numberOfFailedAttempts;
        this.lastFailedAt = Optional.of(Instant.now(this.module.clock()));
        this.lastFailureReason = Optional.ofNullable(cause);
        Duration retryDelay = this.getRetryDelayConfig();
        this.retryDueAt = Optional.of(this.lastFailedAt.get().plus(retryDelay));
    }

    public boolean isUserFunctionComplete() {
        return this.getMaybeUserFunctionSucceeded().isPresent();
    }

    public boolean isUserFunctionSucceeded() {
        Optional<Boolean> userFunctionSucceeded = this.getMaybeUserFunctionSucceeded();
        return userFunctionSucceeded.orElse(false);
    }

    public String toString() {
        return "WorkContainer(tp:" + KafkaUtils.toTopicPartition(this.cr) + ":o:" + this.cr.offset() + ":k:" + this.cr.key() + ")";
    }

    public Duration getTimeInFlight() {
        if (!this.timeTakenAsWorkMs.isPresent()) {
            return Duration.ZERO;
        }
        long millis = System.currentTimeMillis() - this.timeTakenAsWorkMs.get();
        return Duration.ofMillis(millis);
    }

    public long offset() {
        return this.getCr().offset();
    }

    public boolean hasPreviouslyFailed() {
        return this.getNumberOfFailedAttempts() > 0;
    }

    public boolean isAvailableToTakeAsWork() {
        return this.isNotInFlight() && !this.isUserFunctionSucceeded() && this.isDelayPassed();
    }

    public void onPostAddToMailBox(PollContextInternal<K, V> context, Optional<ProducerManager<K, V>> producerManager) {
        producerManager.ifPresent(pm -> {
            Optional<ProducerManager.ProducingLock> producingLock = context.getProducingLock();
            producingLock.ifPresent(pm::finishProducing);
        });
    }

    public long getEpoch() {
        return this.epoch;
    }

    public String getWorkType() {
        return this.workType;
    }

    public void setWorkType(String workType) {
        this.workType = workType;
    }

    public ConsumerRecord<K, V> getCr() {
        return this.cr;
    }

    public int getNumberOfFailedAttempts() {
        return this.numberOfFailedAttempts;
    }

    public Optional<Instant> getLastFailedAt() {
        return this.lastFailedAt;
    }

    public Optional<Instant> getSucceededAt() {
        return this.succeededAt;
    }

    public Optional<Throwable> getLastFailureReason() {
        return this.lastFailureReason;
    }

    public Optional<Boolean> getMaybeUserFunctionSucceeded() {
        return this.maybeUserFunctionSucceeded;
    }

    public Future<List<?>> getFuture() {
        return this.future;
    }

    public void setFuture(Future<List<?>> future) {
        this.future = future;
    }
}

