/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.adapter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BoundedSourceSystem {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BoundedSourceSystem.class);

    private static <T> @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized BoundedSource<T>> split(@UnknownKeyFor @NonNull @Initialized BoundedSource<T> source, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions) throws @UnknownKeyFor @NonNull @Initialized Exception {
        long estimatedSize;
        long bundleSize;
        List splits;
        int numSplits = pipelineOptions.getMaxSourceParallelism();
        if (numSplits > 1 && !(splits = source.split(bundleSize = ((estimatedSize = source.getEstimatedSizeBytes((PipelineOptions)pipelineOptions)) + (long)numSplits - 1L) / (long)numSplits, (PipelineOptions)pipelineOptions)).isEmpty()) {
            return splits;
        }
        return Collections.singletonList(source);
    }

    public static class Factory<@UnknownKeyFor T>
    implements SystemFactory {
        public @UnknownKeyFor @NonNull @Initialized SystemConsumer getConsumer(@UnknownKeyFor @NonNull @Initialized String systemName, @UnknownKeyFor @NonNull @Initialized Config config, @UnknownKeyFor @NonNull @Initialized MetricsRegistry registry) {
            String streamPrefix = "systems." + systemName;
            Config scopedConfig = config.subset(streamPrefix + ".", true);
            return new Consumer<T>(Factory.getBoundedSource(scopedConfig), Factory.getPipelineOptions(config), new SamzaMetricsContainer((MetricsRegistryMap)registry), (String)scopedConfig.get((Object)"stepName"));
        }

        public @UnknownKeyFor @NonNull @Initialized SystemProducer getProducer(@UnknownKeyFor @NonNull @Initialized String systemName, @UnknownKeyFor @NonNull @Initialized Config config, @UnknownKeyFor @NonNull @Initialized MetricsRegistry registry) {
            LOG.info("System " + systemName + " does not have producer.");
            return null;
        }

        public @UnknownKeyFor @NonNull @Initialized SystemAdmin getAdmin(@UnknownKeyFor @NonNull @Initialized String systemName, @UnknownKeyFor @NonNull @Initialized Config config) {
            Config scopedConfig = config.subset("systems." + systemName + ".", true);
            return new Admin<T>(Factory.getBoundedSource(scopedConfig), Factory.getPipelineOptions(config));
        }

        private static <T> @UnknownKeyFor @NonNull @Initialized BoundedSource<T> getBoundedSource(@UnknownKeyFor @NonNull @Initialized Config config) {
            BoundedSource source = (BoundedSource)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"source")), BoundedSource.class);
            return source;
        }

        private static <T> @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized WindowedValue<T>> getCoder(@UnknownKeyFor @NonNull @Initialized Config config) {
            return (Coder)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"coder")), Coder.class);
        }

        private static @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions getPipelineOptions(@UnknownKeyFor @NonNull @Initialized Config config) {
            return (SamzaPipelineOptions)((SerializablePipelineOptions)Base64Serializer.deserializeUnchecked((String)((String)config.get((Object)"beamPipelineOptions")), SerializablePipelineOptions.class)).get().as(SamzaPipelineOptions.class);
        }
    }

    public static class Consumer<@UnknownKeyFor T>
    implements SystemConsumer {
        private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(Consumer.class);
        private static final @UnknownKeyFor @NonNull @Initialized AtomicInteger NEXT_ID = new AtomicInteger();
        private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized BoundedSource<T>> splits;
        private final @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions;
        private final @UnknownKeyFor @NonNull @Initialized Map<// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T>, @UnknownKeyFor @NonNull @Initialized SystemStreamPartition> readerToSsp = new HashMap<BoundedSource.BoundedReader<T>, SystemStreamPartition>();
        private final @UnknownKeyFor @NonNull @Initialized SamzaMetricsContainer metricsContainer;
        private final @UnknownKeyFor @NonNull @Initialized String stepName;
        private @UnknownKeyFor @NonNull @Initialized ReaderTask<T> readerTask;

        Consumer(@UnknownKeyFor @NonNull @Initialized BoundedSource<T> source, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions, @UnknownKeyFor @NonNull @Initialized SamzaMetricsContainer metricsContainer, @UnknownKeyFor @NonNull @Initialized String stepName) {
            try {
                this.splits = BoundedSourceSystem.split(source, pipelineOptions);
            }
            catch (Exception e) {
                throw new SamzaException("Fail to split source", (Throwable)e);
            }
            this.pipelineOptions = pipelineOptions;
            this.metricsContainer = metricsContainer;
            this.stepName = stepName;
        }

        public void start() {
            if (this.readerToSsp.isEmpty()) {
                throw new IllegalArgumentException("Attempted to call start without assigned system stream partitions");
            }
            int capacity = this.pipelineOptions.getSystemBufferSize();
            FnWithMetricsWrapper metricsWrapper = this.pipelineOptions.getEnableMetrics() != false ? new FnWithMetricsWrapper(this.metricsContainer, this.stepName) : null;
            this.readerTask = new ReaderTask(this.readerToSsp, capacity, metricsWrapper);
            Thread thread = new Thread(this.readerTask, "bounded-source-system-consumer-" + NEXT_ID.getAndIncrement());
            thread.start();
        }

        public void stop() {
            if (this.readerTask != null) {
                ((ReaderTask)this.readerTask).stop();
            }
        }

        public void register(@UnknownKeyFor @NonNull @Initialized SystemStreamPartition ssp, @UnknownKeyFor @NonNull @Initialized String offset) {
            int partitionId = ssp.getPartition().getPartitionId();
            try {
                BoundedSource.BoundedReader reader = this.splits.get(partitionId).createReader((PipelineOptions)this.pipelineOptions);
                this.readerToSsp.put(reader, ssp);
            }
            catch (Exception e) {
                throw new SamzaException("Error while creating source reader for ssp: " + ssp, (Throwable)e);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized IncomingMessageEnvelope>> poll(@UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition> systemStreamPartitions, @UnknownKeyFor @NonNull @Initialized long timeout) throws @UnknownKeyFor @NonNull @Initialized InterruptedException {
            assert (!this.readerToSsp.isEmpty());
            HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
            for (SystemStreamPartition ssp : systemStreamPartitions) {
                envelopes.put(ssp, ((ReaderTask)this.readerTask).getNextMessages(ssp, timeout));
            }
            return envelopes;
        }

        private static class ReaderTask<@UnknownKeyFor T>
        implements Runnable {
            private final @UnknownKeyFor @NonNull @Initialized Map<// Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T>, @UnknownKeyFor @NonNull @Initialized SystemStreamPartition> readerToSsp;
            private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition, @UnknownKeyFor @NonNull @Initialized LinkedBlockingQueue<@UnknownKeyFor @NonNull @Initialized IncomingMessageEnvelope>> queues;
            private final @UnknownKeyFor @NonNull @Initialized Semaphore available;
            private final @UnknownKeyFor @NonNull @Initialized FnWithMetricsWrapper metricsWrapper;
            private @UnknownKeyFor @NonNull @Initialized long offset;
            private volatile @UnknownKeyFor @NonNull @Initialized Thread readerThread;
            private volatile @UnknownKeyFor @NonNull @Initialized boolean stopInvoked = false;
            private volatile @UnknownKeyFor @NonNull @Initialized Exception lastException;

            private ReaderTask(@UnknownKeyFor @NonNull @Initialized Map<// Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T>, @UnknownKeyFor @NonNull @Initialized SystemStreamPartition> readerToSsp, @UnknownKeyFor @NonNull @Initialized int capacity, @UnknownKeyFor @NonNull @Initialized FnWithMetricsWrapper metricsWrapper) {
                this.readerToSsp = readerToSsp;
                this.available = new Semaphore(capacity);
                this.metricsWrapper = metricsWrapper;
                HashMap qs = new HashMap();
                readerToSsp.values().forEach(ssp -> qs.put(ssp, new LinkedBlockingQueue()));
                this.queues = ImmutableMap.copyOf(qs);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                this.readerThread = Thread.currentThread();
                HashSet<BoundedSource.BoundedReader<BoundedSource.BoundedReader>> availableReaders = new HashSet<BoundedSource.BoundedReader<BoundedSource.BoundedReader>>(this.readerToSsp.keySet());
                try {
                    boolean hasData;
                    for (BoundedSource.BoundedReader<T> boundedReader : this.readerToSsp.keySet()) {
                        hasData = this.invoke(() -> boundedReader.start());
                        if (hasData) {
                            this.enqueueMessage(boundedReader);
                            continue;
                        }
                        this.enqueueMaxWatermarkAndEndOfStream(boundedReader);
                        boundedReader.close();
                        availableReaders.remove(boundedReader);
                    }
                    while (!this.stopInvoked && !availableReaders.isEmpty()) {
                        Iterator iter = availableReaders.iterator();
                        while (iter.hasNext()) {
                            BoundedSource.BoundedReader boundedReader = (BoundedSource.BoundedReader)iter.next();
                            hasData = this.invoke(() -> ((BoundedSource.BoundedReader)boundedReader).advance());
                            if (hasData) {
                                this.enqueueMessage(boundedReader);
                                continue;
                            }
                            this.enqueueMaxWatermarkAndEndOfStream(boundedReader);
                            boundedReader.close();
                            iter.remove();
                        }
                    }
                }
                catch (InterruptedException iter) {
                }
                catch (Exception e) {
                    this.setError(e);
                }
                finally {
                    availableReaders.forEach(reader -> {
                        try {
                            reader.close();
                        }
                        catch (IOException e) {
                            LOG.error("Reader task failed to close reader for ssp {}", (Object)this.readerToSsp.get(reader), (Object)e);
                        }
                    });
                }
            }

            private <X> X invoke(@UnknownKeyFor @NonNull @Initialized FnWithMetricsWrapper.SupplierWithException<X> fn) throws @UnknownKeyFor @NonNull @Initialized Exception {
                if (this.metricsWrapper != null) {
                    return this.metricsWrapper.wrap(fn, true);
                }
                return fn.get();
            }

            private void enqueueMessage(// Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T> reader) throws @UnknownKeyFor @NonNull @Initialized InterruptedException {
                Object value = reader.getCurrent();
                WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)value, (Instant)reader.getCurrentTimestamp());
                SystemStreamPartition ssp = this.readerToSsp.get(reader);
                IncomingMessageEnvelope envelope = new IncomingMessageEnvelope(ssp, Long.toString(this.offset++), null, OpMessage.ofElement(windowedValue));
                this.available.acquire();
                this.queues.get(ssp).put(envelope);
            }

            private void enqueueMaxWatermarkAndEndOfStream(// Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T> reader) {
                SystemStreamPartition ssp = this.readerToSsp.get(reader);
                IncomingMessageEnvelope watermarkEnvelope = IncomingMessageEnvelope.buildWatermarkEnvelope((SystemStreamPartition)ssp, (long)BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
                this.enqueueUninterruptibly(watermarkEnvelope);
                IncomingMessageEnvelope endOfStreamEnvelope = IncomingMessageEnvelope.buildEndOfStreamEnvelope((SystemStreamPartition)ssp);
                this.enqueueUninterruptibly(endOfStreamEnvelope);
            }

            private void stop() {
                this.stopInvoked = true;
                Thread readerThread = this.readerThread;
                if (readerThread != null) {
                    readerThread.interrupt();
                }
            }

            private @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized IncomingMessageEnvelope> getNextMessages(@UnknownKeyFor @NonNull @Initialized SystemStreamPartition ssp, @UnknownKeyFor @NonNull @Initialized long timeoutMillis) throws @UnknownKeyFor @NonNull @Initialized InterruptedException {
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                ArrayList<IncomingMessageEnvelope> envelopes = new ArrayList<IncomingMessageEnvelope>();
                BlockingQueue queue = this.queues.get(ssp);
                IncomingMessageEnvelope envelope = (IncomingMessageEnvelope)queue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
                if (envelope != null) {
                    envelopes.add(envelope);
                    queue.drainTo(envelopes);
                }
                this.available.release(envelopes.size());
                if (this.lastException != null) {
                    throw new RuntimeException(this.lastException);
                }
                return envelopes;
            }

            private void setError(@UnknownKeyFor @NonNull @Initialized Exception exception) {
                this.lastException = exception;
                this.readerToSsp.values().forEach(ssp -> {
                    IncomingMessageEnvelope checkLastExceptionEvelope = new IncomingMessageEnvelope(ssp, null, null, null);
                    this.enqueueUninterruptibly(checkLastExceptionEvelope);
                });
            }

            private void enqueueUninterruptibly(@UnknownKeyFor @NonNull @Initialized IncomingMessageEnvelope envelope) {
                BlockingQueue queue = this.queues.get(envelope.getSystemStreamPartition());
                while (true) {
                    try {
                        queue.put(envelope);
                        return;
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
            }
        }
    }

    public static class Admin<@UnknownKeyFor T>
    implements SystemAdmin {
        private final @UnknownKeyFor @NonNull @Initialized BoundedSource<T> source;
        private final @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions;

        public Admin(@UnknownKeyFor @NonNull @Initialized BoundedSource<T> source, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions) {
            this.source = source;
            this.pipelineOptions = pipelineOptions;
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition, @UnknownKeyFor @NonNull @Initialized String> getOffsetsAfter(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized SystemStreamPartition, @UnknownKeyFor @NonNull @Initialized String> offsets) {
            return offsets;
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized SystemStreamMetadata> getSystemStreamMetadata(@UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> streamNames) {
            return streamNames.stream().collect(Collectors.toMap(Function.identity(), streamName -> {
                try {
                    List splits = BoundedSourceSystem.split(this.source, this.pipelineOptions);
                    HashMap<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetaData = new HashMap<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>();
                    for (int i = 0; i < splits.size(); ++i) {
                        partitionMetaData.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null));
                    }
                    return new SystemStreamMetadata(streamName, partitionMetaData);
                }
                catch (Exception e) {
                    throw new SamzaException("Fail to read stream metadata", (Throwable)e);
                }
            }));
        }

        public @UnknownKeyFor @NonNull @Initialized Integer offsetComparator(@UnknownKeyFor @NonNull @Initialized String offset1, @UnknownKeyFor @NonNull @Initialized String offset2) {
            if (offset1 == null) {
                return offset2 == null ? 0 : -1;
            }
            if (offset2 == null) {
                return 1;
            }
            return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
        }
    }
}

