/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.converter;

import com.google.common.base.Optional;
import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.converter.SchemaConversionException;
import org.apache.gobblin.converter.initializer.ConverterInitializer;
import org.apache.gobblin.converter.initializer.NoopConverterInitializer;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.MetadataUpdateControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.FinalState;
import org.reactivestreams.Publisher;

public abstract class Converter<SI, SO, DI, DO>
implements Closeable,
FinalState,
RecordStreamProcessor<SI, SO, DI, DO> {
    private GlobalMetadata<SO> outputGlobalMetadata;

    public Converter<SI, SO, DI, DO> init(WorkUnitState workUnit) {
        return this;
    }

    @Override
    public void close() throws IOException {
    }

    public abstract SO convertSchema(SI var1, WorkUnitState var2) throws SchemaConversionException;

    public abstract Iterable<DO> convertRecord(SO var1, DI var2, WorkUnitState var3) throws DataConversionException;

    protected Flowable<RecordEnvelope<DO>> convertRecordEnvelope(SO outputSchema, RecordEnvelope<DI> inputRecordEnvelope, WorkUnitState workUnitState) throws DataConversionException {
        Iterator convertedIterable = this.convertRecord(outputSchema, inputRecordEnvelope.getRecord(), workUnitState).iterator();
        if (!convertedIterable.hasNext()) {
            inputRecordEnvelope.ack();
            return Flowable.empty();
        }
        DO firstRecord = convertedIterable.next();
        if (!convertedIterable.hasNext()) {
            return Flowable.just(inputRecordEnvelope.withRecord(firstRecord));
        }
        RecordEnvelope.ForkRecordBuilder forkRecordBuilder = inputRecordEnvelope.forkRecordBuilder();
        return Flowable.just(firstRecord).concatWith((Publisher)Flowable.fromIterable(() -> convertedIterable)).map(forkRecordBuilder::childRecord).doOnComplete(forkRecordBuilder::close);
    }

    @Override
    public State getFinalState() {
        return new State();
    }

    @Override
    public RecordStreamWithMetadata<DO, SO> processStream(RecordStreamWithMetadata<DI, SI> inputStream, WorkUnitState workUnitState) throws SchemaConversionException {
        this.init(workUnitState);
        this.outputGlobalMetadata = GlobalMetadata.builderWithInput(inputStream.getGlobalMetadata(), Optional.fromNullable(this.convertSchema(inputStream.getGlobalMetadata().getSchema(), workUnitState))).build();
        Flowable outputStream = inputStream.getRecordStream().flatMap(in -> {
            if (in instanceof ControlMessage) {
                MetadataUpdateControlMessage out = (MetadataUpdateControlMessage)in;
                this.getMessageHandler().handleMessage((ControlMessage)in);
                if (in instanceof MetadataUpdateControlMessage) {
                    this.outputGlobalMetadata = GlobalMetadata.builderWithInput(((MetadataUpdateControlMessage)in).getGlobalMetadata(), Optional.fromNullable(this.convertSchema(((MetadataUpdateControlMessage)in).getGlobalMetadata().getSchema(), workUnitState))).build();
                    out = new MetadataUpdateControlMessage(this.outputGlobalMetadata);
                }
                return Flowable.just((Object)out);
            }
            if (in instanceof RecordEnvelope) {
                RecordEnvelope recordEnvelope = (RecordEnvelope)in;
                return this.convertRecordEnvelope(this.outputGlobalMetadata.getSchema(), recordEnvelope, workUnitState);
            }
            throw new UnsupportedOperationException();
        }, 1);
        outputStream = outputStream.doOnComplete(this::close);
        return inputStream.withRecordStream(outputStream, this.outputGlobalMetadata);
    }

    public ControlMessageHandler getMessageHandler() {
        return ControlMessageHandler.NOOP;
    }

    public ConverterInitializer getInitializer(State state, WorkUnitStream workUnits, int branches, int branchId) {
        return NoopConverterInitializer.INSTANCE;
    }
}

