/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Partitioner;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.pipeline.ContextFactory;
import java.util.HashSet;

public class DistinctTransform<T, K>
extends AbstractTransform {
    private final DistributedFunction<? super T, ? extends K> keyFn;

    public DistinctTransform(Transform upstream, DistributedFunction<? super T, ? extends K> keyFn) {
        super("distinct", upstream);
        this.keyFn = keyFn;
    }

    @Override
    public void addToDag(Planner p) {
        String namePrefix = p.uniqueVertexName(this.name(), "-step");
        Vertex v1 = p.dag.newVertex(namePrefix + '1', DistinctTransform.distinctP(this.keyFn)).localParallelism(this.localParallelism());
        Planner.PlannerVertex pv2 = p.addVertex((Transform)this, namePrefix + '2', this.localParallelism(), DistinctTransform.distinctP(this.keyFn));
        p.addEdges((Transform)this, v1, (e, ord) -> e.partitioned(this.keyFn, Partitioner.HASH_CODE));
        p.dag.edge(Edge.between(v1, pv2.v).distributed().partitioned(this.keyFn));
    }

    private static <T, K> ProcessorSupplier distinctP(DistributedFunction<? super T, ? extends K> keyFn) {
        return Processors.filterUsingContextP(ContextFactory.withCreateFn(jet -> new HashSet()), (seenItems, item) -> seenItems.add(keyFn.apply(item)));
    }
}

