/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.service.producer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
import org.apache.nifi.kafka.service.api.producer.ProducerRecordMetadata;
import org.apache.nifi.kafka.shared.util.Notifier;

public class ProducerCallback
implements Callback {
    private final AtomicLong sentCount;
    private final AtomicLong acknowledgedCount;
    private final AtomicLong failedCount;
    private final FlowFile flowFile;
    private final List<ProducerRecordMetadata> metadatas;
    private final List<Exception> exceptions;
    private final Notifier notifier;
    private final Map<String, Long> countsPerTopic = new HashMap<String, Long>();

    public List<Exception> getExceptions() {
        return this.exceptions;
    }

    public boolean isFailure() {
        return !this.exceptions.isEmpty();
    }

    public ProducerCallback(FlowFile flowFile) {
        this.sentCount = new AtomicLong(0L);
        this.acknowledgedCount = new AtomicLong(0L);
        this.failedCount = new AtomicLong(0L);
        this.flowFile = flowFile;
        this.metadatas = new ArrayList<ProducerRecordMetadata>();
        this.exceptions = new ArrayList<Exception>();
        this.notifier = new Notifier();
    }

    public long send(String topic) {
        this.countsPerTopic.put(topic, this.countsPerTopic.getOrDefault(topic, 0L) + 1L);
        return this.sentCount.incrementAndGet();
    }

    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            this.acknowledgedCount.addAndGet(1L);
            this.metadatas.add(ProducerCallback.toProducerRecordMetadata(metadata));
        } else {
            this.failedCount.addAndGet(1L);
            this.exceptions.add(exception);
        }
        this.notifier.notifyWaiter();
    }

    private static ProducerRecordMetadata toProducerRecordMetadata(RecordMetadata m) {
        return new ProducerRecordMetadata(m.topic(), m.partition(), m.offset(), m.timestamp());
    }

    public FlowFileResult waitComplete(long maxAckWaitMillis) {
        Supplier<Boolean> conditionComplete = () -> this.acknowledgedCount.get() + this.failedCount.get() == this.sentCount.get();
        this.notifier.waitForCondition(conditionComplete, maxAckWaitMillis);
        return new FlowFileResult(this.flowFile, this.sentCount.get(), this.countsPerTopic, this.metadatas, this.exceptions);
    }

    public FlowFileResult toFailureResult() {
        return new FlowFileResult(this.flowFile, this.sentCount.get(), this.countsPerTopic, this.metadatas, this.exceptions);
    }
}

