/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.JetEvent;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ConvenientSourceP<S, T>
extends AbstractProcessor {
    private final Function<? super Processor.Context, ? extends S> createFn;
    private final BiConsumer<? super S, ? super SourceBufferConsumerSide<?>> fillBufferFn;
    private final Consumer<? super S> destroyFn;
    private final SourceBufferConsumerSide<?> buffer;
    private final EventTimeMapper<T> eventTimeMapper;
    private boolean initialized;
    private S src;
    private Traverser<?> traverser;

    public ConvenientSourceP(@Nonnull Function<? super Processor.Context, ? extends S> createFn, @Nonnull BiConsumer<? super S, ? super SourceBufferConsumerSide<?>> fillBufferFn, @Nonnull Consumer<? super S> destroyFn, @Nonnull SourceBufferConsumerSide<?> buffer, @Nullable EventTimePolicy<? super T> eventTimePolicy) {
        this.createFn = createFn;
        this.fillBufferFn = fillBufferFn;
        this.destroyFn = destroyFn;
        this.buffer = buffer;
        if (eventTimePolicy != null) {
            this.eventTimeMapper = new EventTimeMapper<T>(eventTimePolicy);
            this.eventTimeMapper.increasePartitionCount(1);
        } else {
            this.eventTimeMapper = null;
        }
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        this.src = this.createFn.apply(context);
        this.initialized = true;
    }

    @Override
    public boolean complete() {
        boolean bufferEmpty;
        if (this.traverser == null) {
            this.fillBufferFn.accept(this.src, this.buffer);
            Traverser<Object> traverser = this.eventTimeMapper == null ? this.buffer.traverse() : (this.traverser = this.buffer.isEmpty() ? this.eventTimeMapper.flatMapIdle() : this.buffer.traverse().flatMap(t -> {
                JetEvent je = (JetEvent)t;
                return this.eventTimeMapper.flatMapEvent(je.payload(), 0, je.timestamp());
            }));
        }
        if (bufferEmpty = this.emitFromTraverser(this.traverser)) {
            this.traverser = null;
        }
        return bufferEmpty && this.buffer.isClosed();
    }

    @Override
    public void close() {
        if (this.initialized) {
            this.destroyFn.accept(this.src);
        }
    }

    public static interface SourceBufferConsumerSide<T> {
        public Traverser<T> traverse();

        public boolean isEmpty();

        public boolean isClosed();
    }
}

