/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.flink.sink;

import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.streampipes.messaging.jms.ActiveMQPublisher;
import org.streampipes.model.grounding.JmsTransportProtocol;
import org.streampipes.wrapper.flink.serializer.ByteArraySerializer;

public class FlinkJmsProducer
extends RichSinkFunction<Map<String, Object>> {
    private static final long serialVersionUID = 1L;
    private JmsTransportProtocol protocol;
    private ByteArraySerializer serializationSchema;
    private ActiveMQPublisher publisher;

    public FlinkJmsProducer(JmsTransportProtocol protocol, ByteArraySerializer serializationSchema) {
        this.protocol = protocol;
        this.serializationSchema = serializationSchema;
    }

    public void open(Configuration configuration) throws Exception {
        try {
            this.publisher = new ActiveMQPublisher();
            this.publisher.connect(this.protocol);
        }
        catch (Exception e) {
            throw new Exception("Failed to open Jms connection: " + e.getMessage(), e);
        }
    }

    public void invoke(Map<String, Object> value) throws Exception {
        byte[] msg = this.serializationSchema.serialize(value);
        this.publisher.publish(msg);
    }
}

