/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.Util;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

public final class WriteJmsP {
    private static final int PREFERRED_LOCAL_PARALLELISM = 4;

    private WriteJmsP() {
    }

    public static <T> ProcessorMetaSupplier supplier(DistributedSupplier<? extends Connection> connectionSupplier, DistributedFunction<? super Connection, ? extends Session> sessionF, DistributedBiFunction<? super Session, T, ? extends Message> messageFn, DistributedBiConsumer<? super MessageProducer, ? super Message> sendFn, DistributedConsumer<? super Session> flushFn, String name, boolean isTopic) {
        return ProcessorMetaSupplier.of(new Supplier(connectionSupplier, sessionF, messageFn, sendFn, flushFn, name, isTopic), 4);
    }

    private static final class Supplier<T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final DistributedSupplier<? extends Connection> connectionSupplier;
        private final DistributedFunction<? super Connection, ? extends Session> sessionF;
        private final String name;
        private final boolean isTopic;
        private final DistributedBiFunction<? super Session, ? super T, ? extends Message> messageFn;
        private final DistributedBiConsumer<? super MessageProducer, ? super Message> sendFn;
        private final DistributedConsumer<? super Session> flushFn;
        private transient Connection connection;

        private Supplier(DistributedSupplier<? extends Connection> connectionSupplier, DistributedFunction<? super Connection, ? extends Session> sessionF, DistributedBiFunction<? super Session, ? super T, ? extends Message> messageFn, DistributedBiConsumer<? super MessageProducer, ? super Message> sendFn, DistributedConsumer<? super Session> flushFn, String name, boolean isTopic) {
            this.connectionSupplier = connectionSupplier;
            this.sessionF = sessionF;
            this.messageFn = messageFn;
            this.sendFn = sendFn;
            this.flushFn = flushFn;
            this.name = name;
            this.isTopic = isTopic;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context ignored) {
            this.connection = this.connectionSupplier.get();
            Util.uncheckRun(() -> this.connection.start());
        }

        @Override
        @Nonnull
        public Collection<? extends Processor> get(int count) {
            DistributedFunction<Processor.Context, JmsContext> createFn = jet -> {
                Session session = this.sessionF.apply((Connection)this.connection);
                Topic destination = this.isTopic ? session.createTopic(this.name) : session.createQueue(this.name);
                MessageProducer producer = session.createProducer((Destination)destination);
                return new JmsContext(session, producer);
            };
            DistributedBiConsumer<JmsContext, Object> onReceiveFn = (jmsContext, item) -> {
                Message message = this.messageFn.apply((Session)((Session)((JmsContext)jmsContext).session), (Session)item);
                this.sendFn.accept((MessageProducer)((JmsContext)jmsContext).producer, (Message)message);
            };
            DistributedConsumer<JmsContext> flushF = jmsContext -> this.flushFn.accept((Session)((JmsContext)jmsContext).session);
            DistributedConsumer<JmsContext> destroyFn = jmsContext -> {
                ((JmsContext)jmsContext).producer.close();
                ((JmsContext)jmsContext).session.close();
            };
            DistributedSupplier<Processor> supplier = SinkProcessors.writeBufferedP(createFn, onReceiveFn, flushF, destroyFn);
            return Stream.generate(supplier).limit(count).collect(Collectors.toList());
        }

        @Override
        public void close(Throwable error) throws Exception {
            if (this.connection != null) {
                this.connection.close();
            }
        }

        class JmsContext {
            private final Session session;
            private final MessageProducer producer;

            JmsContext(Session session, MessageProducer producer) {
                this.session = session;
                this.producer = producer;
            }
        }
    }
}

