/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.spark.sql.streaming;

import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.streaming.MetadataLog;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.types.StructType;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.spark.sql.streaming.EsCommitProtocol;
import org.elasticsearch.spark.sql.streaming.EsSinkMetadataLog;
import org.elasticsearch.spark.sql.streaming.EsSinkStatus;
import org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$;
import org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter;
import org.elasticsearch.spark.sql.streaming.JobState;
import org.elasticsearch.spark.sql.streaming.NullMetadataLog;
import org.elasticsearch.spark.sql.streaming.SparkSqlStreamingConfigs$;
import org.elasticsearch.spark.sql.streaming.TaskCommit;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001A4A!\u0001\u0002\u0001\u001b\t9Ri]*qCJ\\7+\u001d7TiJ,\u0017-\\5oONKgn\u001b\u0006\u0003\u0007\u0011\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u00151\u0011aA:rY*\u0011q\u0001C\u0001\u0006gB\f'o\u001b\u0006\u0003\u0013)\tQ\"\u001a7bgRL7m]3be\u000eD'\"A\u0006\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0007\u0001qA\u0003\u0005\u0002\u0010%5\t\u0001CC\u0001\u0012\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0002C\u0001\u0004B]f\u0014VM\u001a\t\u0003+ui\u0011A\u0006\u0006\u0003\u0007]Q!\u0001G\r\u0002\u0013\u0015DXmY;uS>t'BA\u0003\u001b\u0015\t91D\u0003\u0002\u001d\u0015\u00051\u0011\r]1dQ\u0016L!A\b\f\u0003\tMKgn\u001b\u0005\tA\u0001\u0011\t\u0011)A\u0005C\u0005a1\u000f]1sWN+7o]5p]B\u0011!eI\u0007\u00023%\u0011A%\u0007\u0002\r'B\f'o[*fgNLwN\u001c\u0005\tM\u0001\u0011\t\u0011)A\u0005O\u0005A1/\u001a;uS:<7\u000f\u0005\u0002)[5\t\u0011F\u0003\u0002+W\u0005\u00191MZ4\u000b\u00051B\u0011A\u00025bI>|\u0007/\u0003\u0002/S\tA1+\u001a;uS:<7\u000fC\u00031\u0001\u0011\u0005\u0011'\u0001\u0004=S:LGO\u0010\u000b\u0004eQ*\u0004CA\u001a\u0001\u001b\u0005\u0011\u0001\"\u0002\u00110\u0001\u0004\t\u0003\"\u0002\u00140\u0001\u00049\u0003bB\u001c\u0001\u0005\u0004%I\u0001O\u0001\u0007Y><w-\u001a:\u0016\u0003e\u0002\"AO \u000e\u0003mR!\u0001P\u001f\u0002\u000f1|wmZ5oO*\u0011ahG\u0001\bG>lWn\u001c8t\u0013\t\u00015HA\u0002M_\u001eDaA\u0011\u0001!\u0002\u0013I\u0014a\u00027pO\u001e,'\u000f\t\u0005\b\t\u0002\u0011\r\u0011\"\u0003F\u0003!9(/\u001b;f\u0019><W#\u0001$\u0011\u0007U9\u0015*\u0003\u0002I-\tYQ*\u001a;bI\u0006$\u0018\rT8h!\ry!\nT\u0005\u0003\u0017B\u0011Q!\u0011:sCf\u0004\"aM'\n\u00059\u0013!\u0001D#t'&t7n\u0015;biV\u001c\bB\u0002)\u0001A\u0003%a)A\u0005xe&$X\rT8hA!)!\u000b\u0001C!'\u0006A\u0011\r\u001a3CCR\u001c\u0007\u000eF\u0002U/r\u0003\"aD+\n\u0005Y\u0003\"\u0001B+oSRDQ\u0001W)A\u0002e\u000bqAY1uG\"LE\r\u0005\u0002\u00105&\u00111\f\u0005\u0002\u0005\u0019>tw\rC\u0003^#\u0002\u0007a,\u0001\u0003eCR\f\u0007CA0n\u001d\t\u00017N\u0004\u0002bU:\u0011!-\u001b\b\u0003G\"t!\u0001Z4\u000e\u0003\u0015T!A\u001a\u0007\u0002\rq\u0012xn\u001c;?\u0013\u0005Y\u0011B\u0001\u000f\u000b\u0013\t91$\u0003\u0002\u00065%\u0011A.G\u0001\ba\u0006\u001c7.Y4f\u0013\tqwNA\u0005ECR\fgI]1nK*\u0011A.\u0007")
public class EsSparkSqlStreamingSink
implements Sink {
    public final SparkSession org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$sparkSession;
    public final Settings org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$settings;
    private final Log logger;
    private final MetadataLog<EsSinkStatus[]> writeLog;

    private Log logger() {
        return this.logger;
    }

    private MetadataLog<EsSinkStatus[]> writeLog() {
        return this.writeLog;
    }

    public void addBatch(long batchId, Dataset<Row> data) {
        if (batchId <= BoxesRunTime.unboxToLong((Object)this.writeLog().getLatest().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final long apply(Tuple2<Object, EsSinkStatus[]> x$1) {
                return x$1._1$mcJ$sp();
            }
        }).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final long apply() {
                return this.apply$mcJ$sp();
            }

            public long apply$mcJ$sp() {
                return -1L;
            }
        }))) {
            this.logger().info((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Skipping already committed batch [", "]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)batchId)})));
        } else {
            EsCommitProtocol commitProtocol = new EsCommitProtocol(this.writeLog());
            QueryExecution queryExecution = data.queryExecution();
            StructType schema2 = data.schema();
            SQLExecution$.MODULE$.withNewExecutionId(this.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$sparkSession, queryExecution, (Function0)new Serializable(this, batchId, commitProtocol, queryExecution, schema2){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ EsSparkSqlStreamingSink $outer;
                private final long batchId$1;
                public final EsCommitProtocol commitProtocol$1;
                private final QueryExecution queryExecution$1;
                public final StructType schema$1;

                public final void apply() {
                    this.apply$mcV$sp();
                }

                public void apply$mcV$sp() {
                    String queryName = (String)SparkSqlStreamingConfigs$.MODULE$.getQueryName(this.$outer.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$settings).getOrElse((Function0)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final String apply() {
                            return UUID.randomUUID().toString();
                        }
                    });
                    JobState jobState = new JobState(queryName, this.batchId$1);
                    this.commitProtocol$1.initJob(jobState);
                    try {
                        String serializedSettings = this.$outer.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$settings.save();
                        TaskCommit[] taskCommits = (TaskCommit[])this.$outer.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$sparkSession.sparkContext().runJob(this.queryExecution$1.toRdd(), (Function2)new Serializable(this, serializedSettings){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ $anonfun$addBatch$2 $outer;
                            private final String serializedSettings$1;

                            public final TaskCommit apply(TaskContext taskContext, Iterator<InternalRow> iter) {
                                return new EsStreamQueryWriter(this.serializedSettings$1, this.$outer.schema$1, this.$outer.commitProtocol$1).run(taskContext, iter);
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                                this.serializedSettings$1 = serializedSettings$1;
                            }
                        }, ClassTag$.MODULE$.apply(TaskCommit.class));
                        this.commitProtocol$1.commitJob(jobState, (Seq<TaskCommit>)Predef$.MODULE$.wrapRefArray((Object[])taskCommits));
                        return;
                    }
                    catch (Throwable throwable) {
                        this.commitProtocol$1.abortJob(jobState);
                        throw throwable;
                    }
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.batchId$1 = batchId$1;
                    this.commitProtocol$1 = commitProtocol$1;
                    this.queryExecution$1 = queryExecution$1;
                    this.schema$1 = schema$1;
                }
            });
        }
    }

    public EsSparkSqlStreamingSink(SparkSession sparkSession, Settings settings) {
        Object object;
        this.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$sparkSession = sparkSession;
        this.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$settings = settings;
        this.logger = LogFactory.getLog(EsSparkSqlStreamingSink.class);
        if (SparkSqlStreamingConfigs$.MODULE$.getSinkLogEnabled(settings)) {
            String logPath = SparkSqlStreamingConfigs$.MODULE$.constructCommitLogPath(settings);
            this.logger().info((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Using log path of [", "]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{logPath})));
            object = new EsSinkMetadataLog(settings, sparkSession, logPath);
        } else {
            this.logger().warn((Object)"EsSparkSqlStreamingSink is continuing without write commit log. Be advised that data may be duplicated!");
            object = new NullMetadataLog<EsSinkStatus[]>();
        }
        this.writeLog = object;
    }
}

