/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.UserFunctions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

public class ParallelEoSStreamProcessor<K, V>
extends AbstractParallelEoSStreamProcessor<K, V>
implements ParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessor.class);

    public ParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
        super(newOptions);
    }

    @Override
    public void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction) {
        Function wrappedUserFunc = context -> {
            log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", context);
            UserFunctions.carefullyRun(usersVoidConsumptionFunction, context.getPollContext());
            log.trace("asyncPoll - user function finished ok.");
            return UniLists.of();
        };
        Consumer<Object> voidCallBack = ignore -> log.trace("Void callback applied.");
        this.supervisorLoop(wrappedUserFunc, voidCallBack);
    }

    @Override
    public void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction, Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> callback) {
        if (!this.getOptions().isProducerSupplied()) {
            throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options");
        }
        Function wrappedUserFunc = context -> {
            List recordListToProduce = (List)UserFunctions.carefullyRun(userFunction, context.getPollContext());
            if (recordListToProduce.isEmpty()) {
                log.debug("No result returned from function to send.");
            }
            log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", context, (Object)recordListToProduce);
            ArrayList results = new ArrayList();
            log.trace("Producing {} messages in result...", (Object)recordListToProduce.size());
            List futures = super.getProducerManager().get().produceMessages(recordListToProduce);
            try {
                for (ParallelConsumer.Tuple future : futures) {
                    RecordMetadata recordMetadata = TimeUtils.time(() -> (RecordMetadata)((Future)future.getRight()).get(this.options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS));
                    ParallelStreamProcessor.ConsumeProduceResult result = new ParallelStreamProcessor.ConsumeProduceResult(context.getPollContext(), future.getLeft(), recordMetadata);
                    results.add(result);
                }
            }
            catch (Exception e) {
                throw new InternalRuntimeError("Error while waiting for produce results", e);
            }
            return results;
        };
        this.supervisorLoop(wrappedUserFunc, callback);
    }

    @Override
    public void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction) {
        this.pollAndProduceMany(userFunction, consumerRecord -> log.trace("No-op user callback"));
    }

    @Override
    public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> userFunction) {
        this.pollAndProduce(userFunction, consumerRecord -> log.trace("No-op user callback"));
    }

    @Override
    public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> userFunction, Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> callback) {
        this.pollAndProduceMany(consumerRecord -> UniLists.of((Object)((ProducerRecord)userFunction.apply((PollContext)consumerRecord))), callback);
    }
}

