/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.stream;

import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.MetadataUpdateControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.reactivestreams.Publisher;

public abstract class ControlMessageInjector<SI, DI>
implements Closeable,
RecordStreamProcessor<SI, SI, DI, DI> {
    protected ControlMessageInjector<SI, DI> init(WorkUnitState workUnitState) {
        return this;
    }

    @Override
    public void close() throws IOException {
    }

    protected void setInputGlobalMetadata(GlobalMetadata<SI> inputGlobalMetadata, WorkUnitState workUnitState) {
    }

    protected abstract Iterable<ControlMessage<DI>> injectControlMessagesBefore(RecordEnvelope<DI> var1, WorkUnitState var2);

    protected abstract Iterable<ControlMessage<DI>> injectControlMessagesAfter(RecordEnvelope<DI> var1, WorkUnitState var2);

    @Override
    public RecordStreamWithMetadata<DI, SI> processStream(RecordStreamWithMetadata<DI, SI> inputStream, WorkUnitState workUnitState) throws RecordStreamProcessor.StreamProcessingException {
        this.init(workUnitState);
        this.setInputGlobalMetadata(inputStream.getGlobalMetadata(), workUnitState);
        Flowable outputStream = inputStream.getRecordStream().flatMap(in -> {
            if (in instanceof ControlMessage) {
                if (in instanceof MetadataUpdateControlMessage) {
                    this.setInputGlobalMetadata(((MetadataUpdateControlMessage)in).getGlobalMetadata(), workUnitState);
                }
                this.getMessageHandler().handleMessage((ControlMessage)in);
                return Flowable.just((Object)in);
            }
            if (in instanceof RecordEnvelope) {
                RecordEnvelope recordEnvelope = (RecordEnvelope)in;
                Iterable<ControlMessage<DI>> injectedBeforeIterable = this.injectControlMessagesBefore(recordEnvelope, workUnitState);
                Iterable<ControlMessage<DI>> injectedAfterIterable = this.injectControlMessagesAfter(recordEnvelope, workUnitState);
                if (injectedBeforeIterable == null && injectedAfterIterable == null) {
                    return Flowable.just((Object)recordEnvelope);
                }
                Flowable flowable = injectedBeforeIterable != null ? Flowable.fromIterable(injectedBeforeIterable).concatWith((Publisher)Flowable.just((Object)recordEnvelope)) : Flowable.just((Object)recordEnvelope);
                if (injectedAfterIterable != null) {
                    flowable.concatWith((Publisher)Flowable.fromIterable(injectedAfterIterable));
                }
                return flowable;
            }
            throw new UnsupportedOperationException();
        }, 1);
        outputStream = outputStream.doOnComplete(this::close);
        return inputStream.withRecordStream(outputStream, inputStream.getGlobalMetadata());
    }

    protected ControlMessageHandler getMessageHandler() {
        return ControlMessageHandler.NOOP;
    }
}

