/*
 * Decompiled with CFR 0.152.
 */
package org.jetbrains.kotlinx.spark.api.jupyter;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import kotlin.Metadata;
import kotlin.collections.ArraysKt;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Reflection;
import kotlin.reflect.KType;
import kotlin.reflect.KTypeProjection;
import org.apache.spark.streaming.StreamingContextState;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.kotlinx.jupyter.api.FieldValue;
import org.jetbrains.kotlinx.jupyter.api.FieldsHandlingKt;
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost;
import org.jetbrains.kotlinx.jupyter.api.SubtypeThrowableRenderer;
import org.jetbrains.kotlinx.jupyter.api.ThrowableRenderer;
import org.jetbrains.kotlinx.jupyter.api.VariableDeclaration;
import org.jetbrains.kotlinx.jupyter.api.libraries.JupyterIntegration;
import org.jetbrains.kotlinx.spark.api.jupyter.Integration;

@Metadata(mv={1, 6, 0}, k=1, xi=48, d1={"\u0000<\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0011\n\u0002\u0010\u000e\n\u0002\b\u0004\n\u0002\u0010#\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0003\n\u0000\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\b\u0000\u0018\u00002\u00020\u0001B\u0005\u00a2\u0006\u0002\u0010\u0002J\u0010\u0010\f\u001a\u00020\u00052\u0006\u0010\r\u001a\u00020\u000eH\u0002J\f\u0010\u000f\u001a\u00020\u0010*\u00020\u0011H\u0016J\f\u0010\u0012\u001a\u00020\u0010*\u00020\u0011H\u0016J\f\u0010\u0013\u001a\u00020\u0010*\u00020\u0014H\u0016R\u001c\u0010\u0003\u001a\b\u0012\u0004\u0012\u00020\u00050\u0004X\u0096\u0004\u00a2\u0006\n\n\u0002\u0010\b\u001a\u0004\b\u0006\u0010\u0007R\u0014\u0010\t\u001a\b\u0012\u0004\u0012\u00020\u000b0\nX\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u0015"}, d2={"Lorg/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration;", "Lorg/jetbrains/kotlinx/spark/api/jupyter/Integration;", "()V", "imports", "", "", "getImports", "()[Ljava/lang/String;", "[Ljava/lang/String;", "sscCollection", "", "Lorg/apache/spark/streaming/api/java/JavaStreamingContext;", "cleanUp", "e", "", "onInterrupt", "", "Lorg/jetbrains/kotlinx/jupyter/api/KotlinKernelHost;", "onLoaded", "onLoadedAlsoDo", "Lorg/jetbrains/kotlinx/jupyter/api/libraries/JupyterIntegration$Builder;", "kotlin-spark-api-jupyter-3.1"})
public final class SparkStreamingIntegration
extends Integration {
    @NotNull
    private final String[] imports;
    @NotNull
    private final Set<JavaStreamingContext> sscCollection;

    public SparkStreamingIntegration() {
        Object[] objectArray = new String[]{"org.apache.spark.deploy.SparkHadoopUtil", "org.apache.hadoop.conf.Configuration"};
        this.imports = (String[])ArraysKt.plus((Object[])super.getImports(), (Object[])objectArray);
        this.sscCollection = new LinkedHashSet();
    }

    @Override
    @NotNull
    public String[] getImports() {
        return this.imports;
    }

    /*
     * WARNING - void declaration
     */
    @Override
    public void onLoaded(@NotNull KotlinKernelHost $this$onLoaded) {
        void $this$mapTo$iv$iv;
        void $this$map$iv;
        Intrinsics.checkNotNullParameter((Object)$this$onLoaded, (String)"<this>");
        VariableDeclaration[] variableDeclarationArray = new VariableDeclaration[1];
        String string2 = "sscCollection";
        Object object = this.sscCollection;
        KType kType = Reflection.mutableCollectionType((KType)Reflection.typeOf(Set.class, (KTypeProjection)KTypeProjection.Companion.invariant(Reflection.typeOf(JavaStreamingContext.class))));
        variableDeclarationArray[0] = new VariableDeclaration(string2, object, kType, false);
        FieldsHandlingKt.declare((KotlinKernelHost)$this$onLoaded, (VariableDeclaration[])variableDeclarationArray);
        FieldValue _0 = $this$onLoaded.execute("%dumpClassesForSpark");
        object = new String[]{"@JvmOverloads\nfun withSparkStreaming(\n    batchDuration: Duration = Durations.seconds(1L),\n    checkpointPath: String? = null,\n    hadoopConf: Configuration = SparkHadoopUtil.get().conf(),\n    createOnError: Boolean = false,\n    props: Map<String, Any> = emptyMap(),\n    master: String = SparkConf().get(\"spark.master\", \"local[*]\"),\n    appName: String = \"Kotlin Spark Sample\",\n    timeout: Long = -1L,\n    startStreamingContext: Boolean = true,\n    func: KSparkStreamingSession.() -> Unit,\n) {\n\n    // will only be set when a new context is created\n    var kSparkStreamingSession: KSparkStreamingSession? = null\n\n    val creatingFunc = {\n        val sc = SparkConf()\n            .setAppName(appName)\n            .setMaster(master)\n            .setAll(\n                props\n                    .map { (key, value) -> key X value.toString() }\n                    .asScalaIterable()\n            )\n\n        val ssc = JavaStreamingContext(sc, batchDuration)\n        ssc.checkpoint(checkpointPath)\n\n        kSparkStreamingSession = KSparkStreamingSession(ssc)\n        func(kSparkStreamingSession!!)\n\n        ssc\n    }\n\n    val ssc = when {\n        checkpointPath != null ->\n            JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError)\n\n        else -> creatingFunc()\n    }\n    sscCollection += ssc\n\n    if (startStreamingContext) {\n        ssc.start()\n        kSparkStreamingSession?.invokeRunAfterStart()\n    }\n    ssc.awaitTerminationOrTimeout(timeout)\n    ssc.stop()\n}", "println(\"To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.\")"};
        object = CollectionsKt.listOf((Object[])object);
        boolean $i$f$map = false;
        void var6_7 = $this$map$iv;
        Collection destination$iv$iv = new ArrayList(CollectionsKt.collectionSizeOrDefault((Iterable)$this$map$iv, (int)10));
        boolean $i$f$mapTo = false;
        for (Object item$iv$iv : $this$mapTo$iv$iv) {
            void p0;
            String string3 = (String)item$iv$iv;
            Collection collection = destination$iv$iv;
            boolean bl = false;
            collection.add($this$onLoaded.execute((String)p0));
        }
        List _1 = (List)destination$iv$iv;
    }

    private final String cleanUp(Throwable e) {
        while (!((Collection)this.sscCollection).isEmpty()) {
            JavaStreamingContext it = (JavaStreamingContext)CollectionsKt.first((Iterable)this.sscCollection);
            boolean bl = false;
            while (it.getState() != StreamingContextState.STOPPED) {
                try {
                    it.stop(true, true);
                }
                catch (Exception exception) {}
            }
            this.sscCollection.remove(it);
        }
        return "Spark streams cleaned up. Cause: " + e;
    }

    /*
     * WARNING - void declaration
     */
    @Override
    public void onLoadedAlsoDo(@NotNull JupyterIntegration.Builder $this$onLoadedAlsoDo) {
        void this_$iv;
        Intrinsics.checkNotNullParameter((Object)$this$onLoadedAlsoDo, (String)"<this>");
        JupyterIntegration.Builder builder = $this$onLoadedAlsoDo;
        Function1 renderer$iv = (Function1)new Function1<IllegalMonitorStateException, Object>(this){
            final /* synthetic */ SparkStreamingIntegration this$0;
            {
                this.this$0 = $receiver;
                super(1);
            }

            @NotNull
            public final Object invoke(@NotNull IllegalMonitorStateException it) {
                Intrinsics.checkNotNullParameter((Object)it, (String)"it");
                return SparkStreamingIntegration.access$cleanUp(this.this$0, it);
            }
        };
        boolean $i$f$renderThrowable = false;
        this_$iv.addThrowableRenderer((ThrowableRenderer)new SubtypeThrowableRenderer(Reflection.getOrCreateKotlinClass(IllegalMonitorStateException.class), renderer$iv));
    }

    @Override
    public void onInterrupt(@NotNull KotlinKernelHost $this$onInterrupt) {
        Intrinsics.checkNotNullParameter((Object)$this$onInterrupt, (String)"<this>");
        String string2 = this.cleanUp(new InterruptedException("Kernel was interrupted."));
        System.out.println((Object)string2);
    }

    public static final /* synthetic */ String access$cleanUp(SparkStreamingIntegration $this, Throwable e) {
        return $this.cleanUp(e);
    }
}

