/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WatermarkSourceUtil;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ConvenientSourceP<S, T>
extends AbstractProcessor {
    private final Function<? super Processor.Context, ? extends S> createFn;
    private final BiConsumer<? super S, ? super SourceBufferConsumerSide<? extends T>> fillBufferFn;
    private final Consumer<? super S> destroyFn;
    private final SourceBufferConsumerSide<? extends T> buffer;
    private final WatermarkSourceUtil<T> wsu;
    private boolean initialized;
    private S src;
    private Traverser<?> traverser;

    public ConvenientSourceP(@Nonnull Function<? super Processor.Context, ? extends S> createFn, @Nonnull BiConsumer<? super S, ? super SourceBufferConsumerSide<? extends T>> fillBufferFn, @Nonnull Consumer<? super S> destroyFn, @Nonnull SourceBufferConsumerSide<? extends T> buffer, @Nullable WatermarkGenerationParams<? super T> wmParams) {
        this.createFn = createFn;
        this.fillBufferFn = fillBufferFn;
        this.destroyFn = destroyFn;
        this.buffer = buffer;
        if (wmParams != null) {
            this.wsu = new WatermarkSourceUtil<T>(wmParams);
            this.wsu.increasePartitionCount(1);
        } else {
            this.wsu = null;
        }
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        this.src = this.createFn.apply(context);
        this.initialized = true;
    }

    @Override
    public boolean complete() {
        boolean bufferEmpty;
        if (this.traverser == null) {
            this.fillBufferFn.accept(this.src, this.buffer);
            Traverser<Object> traverser = this.wsu == null ? this.buffer.traverse() : (this.traverser = this.buffer.isEmpty() ? this.wsu.handleNoEvent() : this.buffer.traverse().flatMap(t -> this.wsu.handleEvent(t, 0)));
        }
        if (bufferEmpty = this.emitFromTraverser(this.traverser)) {
            this.traverser = null;
        }
        return bufferEmpty && this.buffer.isClosed();
    }

    @Override
    public void close() {
        if (this.initialized) {
            this.destroyFn.accept(this.src);
        }
    }

    public static interface SourceBufferConsumerSide<T> {
        public Traverser<T> traverse();

        public boolean isEmpty();

        public boolean isClosed();
    }
}

