/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import com.google.auto.service.AutoService;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateFetchingIterators;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterators;
import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.CoderTranslator;
import org.apache.beam.sdk.util.construction.CoderTranslatorRegistrar;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateBackedIterable<@UnknownKeyFor T>
extends ElementByteSizeObservableIterable<T, ElementByteSizeObservableIterator<T>>
implements Serializable {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(StateBackedIterable.class);
    @VisibleForTesting
    final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest request;
    @VisibleForTesting
    final @UnknownKeyFor @NonNull @Initialized List<T> prefix;
    private final transient @UnknownKeyFor @NonNull @Initialized PrefetchableIterable<T> suffix;
    private final @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder<T> elemCoder;

    public StateBackedIterable(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> cache, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, @UnknownKeyFor @NonNull @Initialized String instructionId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateKey stateKey, @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder<T> elemCoder, @UnknownKeyFor @NonNull @Initialized List<T> prefix) {
        this.request = BeamFnApi.StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
        this.prefix = prefix;
        this.suffix = StateFetchingIterators.readAllAndDecodeStartingFrom(beamFnStateClient, this.request, elemCoder);
        this.elemCoder = elemCoder;
    }

    protected @UnknownKeyFor @NonNull @Initialized ElementByteSizeObservableIterator<T> createIterator() {
        return WrappedObservingIterator.create(PrefetchableIterators.concat((Iterator[])new Iterator[]{this.prefix.iterator(), this.suffix.iterator()}), this.elemCoder);
    }

    protected @UnknownKeyFor @NonNull @Initialized Object writeReplace() throws @UnknownKeyFor @NonNull @Initialized ObjectStreamException {
        return ImmutableList.copyOf((Iterable)((Object)this));
    }

    private static class Translator
    implements CoderTranslator<Coder<?>> {
        private Translator() {
        }

        public /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getComponents(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> from) {
            return Collections.singletonList(from.getElemCoder());
        }

        public /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> fromComponents(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> components, @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] payload, // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized CoderTranslation.TranslationContext context) {
            if (context instanceof StateBackedIterableTranslationContext) {
                return new Coder(((StateBackedIterableTranslationContext)context).getCache(), ((StateBackedIterableTranslationContext)context).getStateClient(), ((StateBackedIterableTranslationContext)context).getCurrentInstructionId(), (org.apache.beam.sdk.coders.Coder)Iterables.getOnlyElement(components));
            }
            throw new IllegalStateException(String.format("Unable to construct coder %s. Expected translation context %s but received %s.", "beam:coder:state_backed_iterable:v1", StateBackedIterableTranslationContext.class.getName(), context.getClass().getName()));
        }
    }

    @AutoService(value={CoderTranslatorRegistrar.class})
    public static class Registrar
    implements CoderTranslatorRegistrar {
        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder>, @UnknownKeyFor @NonNull @Initialized String> getCoderURNs() {
            return Collections.singletonMap(Coder.class, "beam:coder:state_backed_iterable:v1");
        }

        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder>, @UnknownKeyFor @NonNull @Initialized CoderTranslator<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder>> getCoderTranslators() {
            return ImmutableMap.of(Coder.class, (Object)new Translator());
        }
    }

    public static interface StateBackedIterableTranslationContext
    extends CoderTranslation.TranslationContext {
        public /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> getCache();

        public @UnknownKeyFor @NonNull @Initialized BeamFnStateClient getStateClient();

        public @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> getCurrentInstructionId();
    }

    public static class Coder<@UnknownKeyFor T>
    extends IterableLikeCoder<T, Iterable<T>> {
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> cache;
        private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
        private final @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> instructionId;

        public Coder(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> cache, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized String> instructionId, @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder<T> elemCoder) {
            super(elemCoder, "StateBackedIterable");
            this.cache = cache;
            this.beamFnStateClient = beamFnStateClient;
            this.instructionId = instructionId;
        }

        protected @UnknownKeyFor @NonNull @Initialized Iterable<T> decodeToIterable(@UnknownKeyFor @NonNull @Initialized List<T> decodedElements) {
            return decodedElements;
        }

        protected @UnknownKeyFor @NonNull @Initialized Iterable<T> decodeToIterable(@UnknownKeyFor @NonNull @Initialized List<T> decodedElements, @UnknownKeyFor @NonNull @Initialized long terminatorValue, @UnknownKeyFor @NonNull @Initialized InputStream in) throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (terminatorValue == -1L) {
                long tokenLength = VarInt.decodeLong((InputStream)in);
                ByteString token = ByteString.readFrom((InputStream)ByteStreams.limit((InputStream)in, (long)tokenLength));
                return new StateBackedIterable<T>(this.cache.get(), this.beamFnStateClient, this.instructionId.get(), BeamFnApi.StateKey.newBuilder().setRunner(BeamFnApi.StateKey.Runner.newBuilder().setKey(token)).build(), this.getElemCoder(), decodedElements);
            }
            throw new IllegalStateException(String.format("StateBackedIterable expected terminator of 0 or -1 but received %s.", terminatorValue));
        }

        public void encode(@UnknownKeyFor @NonNull @Initialized Iterable<T> iterable, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (!(iterable instanceof StateBackedIterable)) {
                super.encode(iterable, outStream);
                return;
            }
            StateBackedIterable stateBackedIterable = (StateBackedIterable)((Object)iterable);
            DataOutputStream dataOutStream = new DataOutputStream(outStream);
            dataOutStream.writeInt(-1);
            BufferedElementCountingOutputStream countingOutputStream = new BufferedElementCountingOutputStream((OutputStream)dataOutStream, -1L);
            for (Object elem : stateBackedIterable.prefix) {
                countingOutputStream.markElementStart();
                this.getElemCoder().encode(elem, (OutputStream)countingOutputStream);
            }
            countingOutputStream.finish();
            dataOutStream.flush();
            VarInt.encode((int)stateBackedIterable.request.getStateKey().getRunner().getKey().size(), (OutputStream)outStream);
            stateBackedIterable.request.getStateKey().getRunner().getKey().writeTo(outStream);
        }
    }

    private static class WrappedObservingIterator<@UnknownKeyFor T>
    extends ElementByteSizeObservableIterator<T> {
        private final @UnknownKeyFor @NonNull @Initialized Iterator<T> wrappedIterator;
        private final @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder<T> elementCoder;
        private @UnknownKeyFor @NonNull @Initialized ElementByteSizeObserver observerProxy = null;
        private @UnknownKeyFor @NonNull @Initialized boolean observerNeedsAdvance = false;
        private @UnknownKeyFor @NonNull @Initialized boolean exceptionLogged = false;
        private static final @UnknownKeyFor @NonNull @Initialized int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
        private static final @UnknownKeyFor @NonNull @Initialized int SAMPLING_CUTOFF = 10;
        private @UnknownKeyFor @NonNull @Initialized int samplingToken = 0;

        static <T> @UnknownKeyFor @NonNull @Initialized WrappedObservingIterator<T> create(@UnknownKeyFor @NonNull @Initialized Iterator<T> iterator, @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder<T> elementCoder) {
            final WrappedObservingIterator<T> result = new WrappedObservingIterator<T>(iterator, elementCoder);
            result.observerProxy = new ElementByteSizeObserver(){

                protected void reportElementSize(@UnknownKeyFor @NonNull @Initialized long elementByteSize) {
                    result.notifyValueReturned(elementByteSize);
                }
            };
            return result;
        }

        private WrappedObservingIterator(@UnknownKeyFor @NonNull @Initialized Iterator<T> iterator, @UnknownKeyFor @NonNull @Initialized org.apache.beam.sdk.coders.Coder<T> elementCoder) {
            this.wrappedIterator = iterator;
            this.elementCoder = elementCoder;
        }

        private @UnknownKeyFor @NonNull @Initialized boolean sampleElement() {
            this.samplingToken = Math.min(this.samplingToken + 1, 1000000);
            return ThreadLocalRandom.current().nextInt(this.samplingToken) < 10;
        }

        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
            if (this.observerNeedsAdvance) {
                this.observerProxy.advance();
                this.observerNeedsAdvance = false;
            }
            return this.wrappedIterator.hasNext();
        }

        public T next() {
            T value;
            block5: {
                value = this.wrappedIterator.next();
                try {
                    boolean cheap = this.elementCoder.isRegisterByteSizeObserverCheap(value);
                    if (cheap || this.sampleElement()) {
                        this.observerProxy.setScalingFactor(cheap ? 1.0 : (double)Math.max(this.samplingToken, 10) / 10.0);
                        this.elementCoder.registerByteSizeObserver(value, this.observerProxy);
                        if (this.observerProxy.getIsLazy()) {
                            this.observerNeedsAdvance = true;
                        } else {
                            this.observerNeedsAdvance = false;
                            this.observerProxy.advance();
                        }
                    }
                }
                catch (Exception e) {
                    if (this.exceptionLogged) break block5;
                    LOG.warn("Lazily observed byte size will be under reported due to exception", (Throwable)e);
                    this.exceptionLogged = true;
                }
            }
            return value;
        }

        public void remove() {
            super.remove();
        }
    }
}

