package cz.seznam.euphoria.spark;

import cz.seznam.euphoria.shaded.guava.com.google.common.collect.AbstractIterator;
import cz.seznam.euphoria.shaded.guava.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/seznam/euphoria/spark/FunctionContextAsync.class */
public class FunctionContextAsync<T> extends FunctionContext<T> {
    private TransferQueue<Object> queue = new LinkedTransferQueue();
    private boolean consumed = false;
    private static final Object EOS = new Object();
    private static final Object EOM = new Object();
    private static final ExecutorService MEDIATOR_POOL = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("FunctionContext-Mediator-thread-%d").setDaemon(true).build());

    @Override // cz.seznam.euphoria.spark.FunctionContext
    public void collect(T t) {
        try {
            this.queue.transfer(t);
            this.queue.transfer(EOM);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public Iterator<SparkElement> iterator() {
        if (this.consumed) {
            throw new IllegalStateException("Iterator already consumed");
        }
        this.consumed = true;
        return new AbstractIterator<SparkElement>() { // from class: cz.seznam.euphoria.spark.FunctionContextAsync.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public SparkElement m2computeNext() {
                try {
                    Object take = FunctionContextAsync.this.queue.take();
                    if (take == FunctionContextAsync.EOS) {
                        return (SparkElement) super.endOfData();
                    }
                    if (take == FunctionContextAsync.EOM) {
                        return m2computeNext();
                    }
                    if (take instanceof Throwable) {
                        throw ((Throwable) take);
                    }
                    return (SparkElement) take;
                } catch (Throwable th) {
                    throw new RuntimeException(th);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runAsynchronously(Runnable runnable) {
        MEDIATOR_POOL.submit(() -> {
            try {
                runnable.run();
                this.queue.put(EOS);
            } catch (Throwable th) {
                try {
                    this.queue.put(th);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
    }
}
