package com.jiuxian.mossrose.job.handler;

import com.jiuxian.mossrose.config.MossroseConfig;
import com.jiuxian.mossrose.job.StreamingJob;
import com.jiuxian.mossrose.job.to.ObjectContainer;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.concurrent.Semaphore;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jiuxian/mossrose/job/handler/StreamingJobHandler.class */
public class StreamingJobHandler extends AbstractJobHandler implements JobHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJobHandler.class);

    @Override // com.jiuxian.mossrose.job.handler.AbstractJobHandler
    public void handle(final MossroseConfig.JobMeta jobMeta, final Ignite ignite) {
        ignite.compute(select(ignite)).withExecutor(jobMeta.getId()).run(new IgniteRunnable() { // from class: com.jiuxian.mossrose.job.handler.StreamingJobHandler.1

            @IgniteInstanceResource
            private Ignite igniteRemote;

            public void run() {
                StreamingJob.Streamer streamer = ((StreamingJob) ObjectContainer.get(jobMeta.getId())).streamer();
                int size = this.igniteRemote.cluster().nodes().size() * jobMeta.getThreads();
                StreamingJobHandler.LOGGER.info("Cluster concurrency : {}", Integer.valueOf(size));
                Semaphore semaphore = new Semaphore(size);
                ArrayList arrayList = new ArrayList();
                IgniteCompute withExecutor = this.igniteRemote.compute(StreamingJobHandler.this.select(ignite)).withExecutor(jobMeta.getId());
                while (streamer.hasNext()) {
                    Serializable serializable = (Serializable) streamer.next();
                    try {
                        semaphore.acquire();
                        MossroseConfig.JobMeta jobMeta2 = jobMeta;
                        IgniteFuture runAsync = withExecutor.runAsync(() -> {
                            ((StreamingJob) ObjectContainer.get(jobMeta2.getId())).executor().execute(serializable);
                        });
                        arrayList.add(runAsync);
                        runAsync.listen(igniteFuture -> {
                            semaphore.release();
                        });
                    } catch (InterruptedException e) {
                    }
                }
                arrayList.forEach((v0) -> {
                    v0.get();
                });
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -1039293982:
                        if (implMethodName.equals("lambda$run$412cb2ab$1")) {
                            z = true;
                            break;
                        }
                        break;
                    case 757450995:
                        if (implMethodName.equals("lambda$run$222300c2$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteInClosure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/jiuxian/mossrose/job/handler/StreamingJobHandler$1") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/Semaphore;Lorg/apache/ignite/lang/IgniteFuture;)V")) {
                            Semaphore semaphore = (Semaphore) serializedLambda.getCapturedArg(0);
                            return igniteFuture -> {
                                semaphore.release();
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/jiuxian/mossrose/job/handler/StreamingJobHandler$1") && serializedLambda.getImplMethodSignature().equals("(Lcom/jiuxian/mossrose/config/MossroseConfig$JobMeta;Ljava/io/Serializable;)V")) {
                            MossroseConfig.JobMeta jobMeta2 = (MossroseConfig.JobMeta) serializedLambda.getCapturedArg(0);
                            Serializable serializable = (Serializable) serializedLambda.getCapturedArg(1);
                            return () -> {
                                ((StreamingJob) ObjectContainer.get(jobMeta2.getId())).executor().execute(serializable);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        });
    }
}
