/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.thrift;

import com.google.auto.value.AutoValue;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.thrift.AutoValue_ThriftIO_ReadFiles;
import org.apache.beam.sdk.io.thrift.AutoValue_ThriftIO_Sink;
import org.apache.beam.sdk.io.thrift.ThriftCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.protocol.TSimpleJSONProtocol;
import org.apache.thrift.transport.AutoExpandingBufferReadTransport;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class ThriftIO {
    private static final Logger LOG = LoggerFactory.getLogger(ThriftIO.class);

    private ThriftIO() {
    }

    public static <T> ReadFiles<T> readFiles(Class<T> recordClass) {
        return new AutoValue_ThriftIO_ReadFiles.Builder<T>().setRecordClass(recordClass).build();
    }

    public static <T extends TBase<?, ?>> Sink<T> sink(TProtocolFactory factory) {
        return new AutoValue_ThriftIO_Sink.Builder().setProtocolFactory(factory).build();
    }

    protected static class ThriftWriter<T extends TBase<?, ?>> {
        private OutputStream stream;
        private TProtocolFactory protocolFactory;

        ThriftWriter(OutputStream stream, TProtocolFactory protocolFactory) {
            this.stream = stream;
            this.protocolFactory = protocolFactory;
        }

        public void write(T element) throws IOException {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            TProtocol protocol = this.protocolFactory.getProtocol((TTransport)new TIOStreamTransport((OutputStream)baos));
            try {
                element.write(protocol);
            }
            catch (TException te) {
                LOG.error("Error in writing element to TProtocol: " + (Object)((Object)te));
                throw new RuntimeException(te);
            }
            this.stream.write(baos.toByteArray());
        }

        public void close() throws IOException {
            this.stream.flush();
            this.stream.close();
        }
    }

    @AutoValue
    public static abstract class Sink<T extends TBase<?, ?>>
    implements FileIO.Sink<T> {
        private transient ThriftWriter<T> writer;

        abstract Builder<T> toBuilder();

        @Nullable
        abstract TProtocolFactory getProtocolFactory();

        public void open(WritableByteChannel channel) throws IOException {
            this.writer = new ThriftWriter(Channels.newOutputStream(channel), this.getProtocolFactory());
        }

        public void write(T element) throws IOException {
            Preconditions.checkNotNull(this.writer, (Object)"Writer cannot be null");
            this.writer.write(element);
        }

        public void flush() throws IOException {
            this.writer.close();
        }

        @AutoValue.Builder
        static abstract class Builder<T extends TBase<?, ?>> {
            Builder() {
            }

            abstract Builder<T> setProtocolFactory(TProtocolFactory var1);

            abstract TProtocolFactory getProtocolFactory();

            abstract Sink<T> autoBuild();

            public Sink<T> build() {
                Preconditions.checkArgument((this.getProtocolFactory() != null ? 1 : 0) != 0, (Object)"TProtocol is required for sink.");
                return this.autoBuild();
            }
        }
    }

    @AutoValue
    public static abstract class ReadFiles<T>
    extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
        abstract Builder<T> toBuilder();

        @Nullable
        abstract Class<T> getRecordClass();

        @Nullable
        abstract TProtocolFactory getTProtocolFactory();

        public ReadFiles<T> withProtocol(TProtocolFactory protocol) {
            Preconditions.checkArgument((!(protocol instanceof TSimpleJSONProtocol.Factory) ? 1 : 0) != 0, (Object)"TSimpleJSONProtocol is a write only protocol");
            return this.toBuilder().setTProtocolFactory(protocol).build();
        }

        public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {
            Preconditions.checkNotNull(this.getRecordClass(), (Object)"Record class cannot be null");
            Preconditions.checkNotNull((Object)this.getTProtocolFactory(), (Object)"Thrift protocol cannot be null");
            return ((PCollection)input.apply((PTransform)ParDo.of(new ReadFn<T>(this.getRecordClass(), this.getTProtocolFactory())))).setCoder(ThriftCoder.of(this.getRecordClass(), this.getTProtocolFactory()));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"Thrift class", (String)this.getRecordClass().toString()).withLabel("Thrift class"));
            builder.add(DisplayData.item((String)"Thrift Protocol", (String)this.getTProtocolFactory().toString()).withLabel("Protocol Type"));
        }

        static class ReadFn<T>
        extends DoFn<FileIO.ReadableFile, T> {
            final Class<T> tBaseType;
            final TProtocolFactory tProtocol;

            ReadFn(Class<T> tBaseType, TProtocolFactory tProtocol) {
                this.tBaseType = tBaseType;
                this.tProtocol = tProtocol;
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element FileIO.ReadableFile file, DoFn.OutputReceiver<T> out) {
                try {
                    InputStream inputStream = Channels.newInputStream(file.open());
                    TIOStreamTransport streamTransport = new TIOStreamTransport((InputStream)new BufferedInputStream(inputStream));
                    AutoExpandingBufferReadTransport readTransport = new AutoExpandingBufferReadTransport(0xFA00000);
                    readTransport.fill((TTransport)streamTransport, inputStream.available());
                    TProtocol protocol = this.tProtocol.getProtocol((TTransport)readTransport);
                    while (protocol.getTransport().getBytesRemainingInBuffer() > 0) {
                        TBase tb = (TBase)this.tBaseType.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                        tb.read(protocol);
                        out.output((Object)tb);
                    }
                }
                catch (Exception ioe) {
                    String filename = file.getMetadata().resourceId().toString();
                    LOG.error(String.format("Error in reading file: %1$s%n%2$s", filename, ioe));
                    throw new RuntimeException(ioe);
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<T> {
            Builder() {
            }

            abstract Builder<T> setRecordClass(Class<T> var1);

            abstract TProtocolFactory getTProtocolFactory();

            abstract Builder<T> setTProtocolFactory(TProtocolFactory var1);

            abstract ReadFiles<T> build();
        }
    }
}

