/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.util.Preconditions;
import java.util.Collections;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class StreamSourceTransform<T>
extends AbstractTransform
implements StreamSource<T> {
    public final Function<? super EventTimePolicy<? super T>, ? extends ProcessorMetaSupplier> metaSupplierFn;
    private boolean isAssignedToStage;
    private final boolean emitsWatermarks;
    @Nullable
    private EventTimePolicy<? super T> eventTimePolicy;
    private final boolean supportsNativeTimestamps;
    private long partitionIdleTimeout = 60000L;

    public StreamSourceTransform(@Nonnull String name, @Nonnull Function<? super EventTimePolicy<? super T>, ? extends ProcessorMetaSupplier> metaSupplierFn, boolean emitsWatermarks, boolean supportsNativeTimestamps) {
        super(name, Collections.emptyList());
        this.metaSupplierFn = metaSupplierFn;
        this.emitsWatermarks = emitsWatermarks;
        this.supportsNativeTimestamps = supportsNativeTimestamps;
    }

    public void onAssignToStage() {
        if (this.isAssignedToStage) {
            throw new IllegalStateException("Sink " + this.name() + " was already assigned to a sink stage");
        }
        this.isAssignedToStage = true;
    }

    @Override
    public void addToDag(Planner p) {
        if (this.emitsWatermarks || this.eventTimePolicy == null) {
            p.addVertex((Transform)this, this.name(), this.localParallelism(), this.metaSupplierFn.apply(this.eventTimePolicy != null ? this.eventTimePolicy : EventTimePolicy.noEventTime()));
        } else {
            String v1name = this.name();
            Vertex v1 = p.dag.newVertex(v1name, this.metaSupplierFn.apply(this.eventTimePolicy)).localParallelism(this.localParallelism());
            int localParallelism = v1.determineLocalParallelism(this.localParallelism());
            Planner.PlannerVertex pv2 = p.addVertex((Transform)this, v1name + "-add-timestamps", localParallelism, Processors.insertWatermarksP(this.eventTimePolicy));
            p.dag.edge(Edge.between(v1, pv2.v).isolated());
        }
    }

    @Nullable
    public EventTimePolicy<? super T> getEventTimePolicy() {
        return this.eventTimePolicy;
    }

    public void setEventTimePolicy(@Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        this.eventTimePolicy = eventTimePolicy;
    }

    public boolean emitsJetEvents() {
        return this.eventTimePolicy != null;
    }

    @Override
    public boolean supportsNativeTimestamps() {
        return this.supportsNativeTimestamps;
    }

    @Override
    public StreamSource<T> setPartitionIdleTimeout(long timeoutMillis) {
        Preconditions.checkNotNegative(timeoutMillis, "timeout must be >= 0 (0 means disabled)");
        this.partitionIdleTimeout = timeoutMillis;
        return this;
    }

    @Override
    public long partitionIdleTimeout() {
        return this.partitionIdleTimeout;
    }
}

