package cz.seznam.euphoria.spark;

import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider;
import cz.seznam.euphoria.shadow.com.google.common.collect.AbstractIterator;
import cz.seznam.euphoria.shadow.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/seznam/euphoria/spark/FunctionCollectorAsync.class */
public class FunctionCollectorAsync<T> extends FunctionCollector<T> {
    private TransferQueue<Object> queue;
    private boolean consumed;
    private static final Object EOS = new Object();
    private static final Object EOM = new Object();
    private static final ExecutorService MEDIATOR_POOL = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("FunctionContext-Mediator-thread-%d").setDaemon(true).build());

    public FunctionCollectorAsync(AccumulatorProvider accumulatorProvider) {
        super(accumulatorProvider);
        this.queue = new LinkedTransferQueue();
        this.consumed = false;
    }

    @Override // cz.seznam.euphoria.spark.FunctionCollector
    public void collect(T t) {
        try {
            this.queue.transfer(t);
            this.queue.transfer(EOM);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public Iterator<SparkElement> iterator() {
        if (this.consumed) {
            throw new IllegalStateException("Iterator already consumed");
        }
        this.consumed = true;
        return new AbstractIterator<SparkElement>() { // from class: cz.seznam.euphoria.spark.FunctionCollectorAsync.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public SparkElement m3computeNext() {
                try {
                    Object take = FunctionCollectorAsync.this.queue.take();
                    if (take == FunctionCollectorAsync.EOS) {
                        return (SparkElement) super.endOfData();
                    }
                    if (take == FunctionCollectorAsync.EOM) {
                        return m3computeNext();
                    }
                    if (take instanceof Throwable) {
                        throw ((Throwable) take);
                    }
                    return (SparkElement) take;
                } catch (Throwable th) {
                    throw new RuntimeException(th);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runAsynchronously(Runnable runnable) {
        TaskContext taskContext = TaskContext$.MODULE$.get();
        MEDIATOR_POOL.submit(() -> {
            TaskContext$.MODULE$.setTaskContext(taskContext);
            try {
                try {
                    runnable.run();
                    this.queue.put(EOS);
                    TaskContext$.MODULE$.unset();
                } catch (Throwable th) {
                    try {
                        this.queue.put(th);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    TaskContext$.MODULE$.unset();
                }
            } catch (Throwable th2) {
                TaskContext$.MODULE$.unset();
                throw th2;
            }
        });
    }
}
