/*
 * Decompiled with CFR 0.152.
 */
package co.decodable.sdk.pipeline.testing;

import co.decodable.sdk.pipeline.EnvironmentAccess;
import co.decodable.sdk.pipeline.testing.DecodableStream;
import co.decodable.sdk.pipeline.testing.StreamRecord;
import co.decodable.sdk.pipeline.testing.TestEnvironment;
import co.decodable.sdk.pipeline.util.Incubating;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Incubating
public class PipelineTestContext
implements AutoCloseable {
    private static final System.Logger LOGGER = System.getLogger(PipelineTestContext.class.getName());
    private final TestEnvironment testEnvironment;
    private final KafkaProducer<String, String> producer;
    private final Map<String, DecodableStreamImpl> streams;
    private final ExecutorService executorService;

    public PipelineTestContext(TestEnvironment testEnvironment) {
        EnvironmentAccess.setEnvironment(testEnvironment);
        this.testEnvironment = testEnvironment;
        this.producer = new KafkaProducer(PipelineTestContext.producerProperties(testEnvironment.bootstrapServers()));
        this.streams = new HashMap<String, DecodableStreamImpl>();
        this.executorService = Executors.newCachedThreadPool();
    }

    private static Properties producerProperties(String bootstrapServers) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    private static Properties consumerProperties(String bootstrapServers) {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("group.id", "my-group");
        return consumerProps;
    }

    public DecodableStream<String> stream(String name) {
        KafkaConsumer consumer = new KafkaConsumer(PipelineTestContext.consumerProperties(this.testEnvironment.bootstrapServers()));
        consumer.subscribe(Collections.singleton(this.testEnvironment.topicFor(name)));
        return this.streams.computeIfAbsent(name, n -> new DecodableStreamImpl((String)n, (KafkaConsumer<String, String>)consumer));
    }

    public void runJobAsync(ThrowingConsumer<String[]> jobMainMethod, String ... args) throws Exception {
        this.executorService.submit(() -> {
            try {
                jobMainMethod.accept(args);
            }
            catch (InterruptedException e) {
                LOGGER.log(System.Logger.Level.INFO, "Job aborted");
            }
            catch (Exception e) {
                LOGGER.log(System.Logger.Level.ERROR, "Job failed", (Throwable)e);
            }
        });
    }

    @Override
    public void close() throws Exception {
        try {
            this.producer.close();
            this.executorService.shutdownNow();
            this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS);
            for (DecodableStreamImpl stream : this.streams.values()) {
                stream.consumer.close();
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't close testing context", e);
        }
        finally {
            EnvironmentAccess.resetEnvironment();
        }
    }

    private class DecodableStreamImpl
    implements DecodableStream<String> {
        private final String streamName;
        private final KafkaConsumer<String, String> consumer;
        private final List<ConsumerRecord<String, String>> consumed;

        public DecodableStreamImpl(String streamName, KafkaConsumer<String, String> consumer) {
            this.streamName = streamName;
            this.consumer = consumer;
            this.consumed = new ArrayList<ConsumerRecord<String, String>>();
        }

        @Override
        public void add(StreamRecord<String> streamRecord) {
            Future sent = PipelineTestContext.this.producer.send(new ProducerRecord(PipelineTestContext.this.testEnvironment.topicFor(this.streamName), (Object)streamRecord.value()));
            try {
                sent.get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Couldn't send record", e);
            }
        }

        @Override
        public Future<StreamRecord<String>> takeOne() {
            return ((CompletableFuture)this.take(1)).thenApply(l -> (StreamRecord)l.get(0));
        }

        @Override
        public Future<List<StreamRecord<String>>> take(int n) {
            return CompletableFuture.supplyAsync(() -> {
                while (this.consumed.size() < n) {
                    ConsumerRecords records = this.consumer.poll(Duration.ofMillis(20L));
                    for (ConsumerRecord record : records) {
                        this.consumed.add((ConsumerRecord<String, String>)record);
                    }
                }
                List result = this.consumed.subList(0, n).stream().map(cr -> new StreamRecord<String>((String)cr.value())).collect(Collectors.toList());
                this.consumed.subList(0, n).clear();
                return result;
            }, PipelineTestContext.this.executorService);
        }
    }

    @FunctionalInterface
    public static interface ThrowingConsumer<T> {
        public void accept(T var1) throws Exception;
    }
}

