/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.flink;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.dataformat.SpDataFormatDefinition;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.base.InvocableStreamPipesEntity;
import org.streampipes.model.grounding.JmsTransportProtocol;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.model.grounding.SimpleTopicDefinition;
import org.streampipes.model.grounding.TransportFormat;
import org.streampipes.model.grounding.TransportProtocol;
import org.streampipes.model.runtime.Event;
import org.streampipes.wrapper.context.RuntimeContext;
import org.streampipes.wrapper.distributed.runtime.DistributedRuntime;
import org.streampipes.wrapper.flink.FlinkDeploymentConfig;
import org.streampipes.wrapper.flink.consumer.JmsConsumer;
import org.streampipes.wrapper.flink.converter.MapToEventConverter;
import org.streampipes.wrapper.flink.logger.StatisticLogger;
import org.streampipes.wrapper.flink.serializer.ByteArrayDeserializer;
import org.streampipes.wrapper.params.binding.BindingParams;
import org.streampipes.wrapper.params.runtime.RuntimeParams;

public abstract class FlinkRuntime<RP extends RuntimeParams<B, I, RC>, B extends BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext>
extends DistributedRuntime<RP, B, I, RC>
implements Runnable,
Serializable {
    private static final long serialVersionUID = 1L;
    protected TimeCharacteristic streamTimeCharacteristic;
    protected FlinkDeploymentConfig config;
    private boolean debug;
    private StreamExecutionEnvironment env;

    @Deprecated
    public FlinkRuntime(B bindingParams) {
        this(bindingParams, true);
    }

    @Deprecated
    public FlinkRuntime(B bindingParams, FlinkDeploymentConfig config) {
        this(bindingParams, config, false);
    }

    public FlinkRuntime(B bindingParams, boolean debug) {
        super(bindingParams);
        this.config = !debug ? this.getDeploymentConfig() : new FlinkDeploymentConfig("", "localhost", 6123);
        this.debug = debug;
    }

    private FlinkRuntime(B bindingParams, FlinkDeploymentConfig config, boolean debug) {
        super(bindingParams);
        this.config = config;
        this.debug = debug;
    }

    @Override
    public void run() {
        try {
            this.env.execute(this.bindingParams.getGraph().getElementId());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void setStreamTimeCharacteristic(TimeCharacteristic streamTimeCharacteristic) {
        this.streamTimeCharacteristic = streamTimeCharacteristic;
    }

    private SourceFunction<Map<String, Object>> getStream1Source() {
        return this.getStreamSource(0);
    }

    private SourceFunction<Map<String, Object>> getStream2Source() {
        return this.getStreamSource(1);
    }

    private SourceFunction<Map<String, Object>> getStreamSource(int i) {
        if (this.bindingParams.getGraph().getInputStreams().size() - 1 >= i) {
            SpDataStream stream = (SpDataStream)this.bindingParams.getGraph().getInputStreams().get(i);
            if (stream != null) {
                TransportProtocol protocol = stream.getEventGrounding().getTransportProtocol();
                TransportFormat format = (TransportFormat)stream.getEventGrounding().getTransportFormats().get(0);
                SpDataFormatDefinition dataFormatDefinition = this.getDataFormatDefinition(format);
                if (protocol instanceof KafkaTransportProtocol) {
                    return this.getKafkaConsumer((KafkaTransportProtocol)protocol, dataFormatDefinition);
                }
                return this.getJmsConsumer((JmsTransportProtocol)protocol, dataFormatDefinition);
            }
            return null;
        }
        return null;
    }

    private SourceFunction<Map<String, Object>> getJmsConsumer(JmsTransportProtocol protocol, SpDataFormatDefinition spDataFormatDefinition) {
        return new JmsConsumer(protocol, spDataFormatDefinition);
    }

    private SourceFunction<Map<String, Object>> getKafkaConsumer(KafkaTransportProtocol protocol, SpDataFormatDefinition spDataFormatDefinition) {
        if (protocol.getTopicDefinition() instanceof SimpleTopicDefinition) {
            return new FlinkKafkaConsumer(protocol.getTopicDefinition().getActualTopicName(), (DeserializationSchema)new ByteArrayDeserializer(spDataFormatDefinition), this.getProperties(protocol));
        }
        String patternTopic = this.replaceWildcardWithPatternFormat(protocol.getTopicDefinition().getActualTopicName());
        return new FlinkKafkaConsumer(Pattern.compile(patternTopic), (DeserializationSchema)new ByteArrayDeserializer(spDataFormatDefinition), this.getProperties(protocol));
    }

    public void prepareRuntime() throws SpRuntimeException {
        this.env = this.debug ? StreamExecutionEnvironment.createLocalEnvironment() : StreamExecutionEnvironment.createRemoteEnvironment((String)this.config.getHost(), (int)this.config.getPort(), (String[])new String[]{this.config.getJarFile()});
        this.appendEnvironmentConfig(this.env);
        SourceFunction<Map<String, Object>> source1 = this.getStream1Source();
        if (source1 == null) {
            throw new SpRuntimeException("At least one source must be defined for a flink sepa");
        }
        DataStream<Event> messageStream1 = this.addSource(source1, 0);
        SourceFunction<Map<String, Object>> source2 = this.getStream2Source();
        if (source2 != null) {
            DataStream<Event> messageStream2 = this.addSource(source2, 1);
            this.appendExecutionConfig(messageStream1, messageStream2);
        } else {
            this.appendExecutionConfig(messageStream1);
        }
    }

    private DataStream<Event> addSource(SourceFunction<Map<String, Object>> sourceFunction, Integer sourceIndex) {
        return this.env.addSource(sourceFunction).flatMap(new MapToEventConverter<RuntimeParams>(this.runtimeParams.getSourceInfo(sourceIndex).getSourceId(), this.runtimeParams)).flatMap((FlatMapFunction)new StatisticLogger(this.getGraph()));
    }

    public void postDiscard() throws SpRuntimeException {
    }

    public void bindRuntime() throws SpRuntimeException {
        try {
            this.prepareRuntime();
            Thread thread = new Thread(this);
            thread.start();
            if (!this.debug) {
                boolean isDeployed = false;
                int count = 0;
                do {
                    try {
                        ++count;
                        Thread.sleep(1000L);
                        Optional<JobStatusMessage> statusMessageOpt = this.getJobStatus(this.bindingParams.getGraph().getElementId());
                        if (!statusMessageOpt.isPresent()) continue;
                        isDeployed = true;
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                } while (!isDeployed && count < 60);
                if (count == 60) {
                    throw new SpRuntimeException("Error: Timeout reached when trying to connect to Flink Job Controller");
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new SpRuntimeException(e.getMessage());
        }
    }

    public void discardRuntime() throws SpRuntimeException {
        try {
            RestClusterClient<String> restClient = this.getRestClient();
            Optional<JobStatusMessage> jobStatusMessage = this.getJobStatus(this.bindingParams.getGraph().getElementId());
            if (!jobStatusMessage.isPresent()) {
                throw new SpRuntimeException("Could not stop Flink Job");
            }
            restClient.cancel(jobStatusMessage.get().getJobId());
        }
        catch (Exception e) {
            throw new SpRuntimeException("Could not find Flink Job Manager, is it running?");
        }
    }

    public void appendEnvironmentConfig(StreamExecutionEnvironment env) {
        if (this.streamTimeCharacteristic != null) {
            env.setStreamTimeCharacteristic(this.streamTimeCharacteristic);
            env.setParallelism(1);
        }
    }

    private RestClusterClient<String> getRestClient() throws Exception {
        Configuration restConfig = new Configuration();
        restConfig.setString(JobManagerOptions.ADDRESS, this.config.getHost());
        restConfig.setInteger(JobManagerOptions.PORT, this.config.getPort());
        return new RestClusterClient(restConfig, (Object)"");
    }

    private Optional<JobStatusMessage> getJobStatus(String jobName) {
        try {
            RestClusterClient<String> restClient = this.getRestClient();
            CompletableFuture jobs = restClient.listJobs();
            return ((Collection)jobs.get()).stream().filter(j -> j.getJobName().equals(jobName) && j.getJobState().name().equals("RUNNING")).findFirst();
        }
        catch (Exception e) {
            e.printStackTrace();
            return Optional.empty();
        }
    }

    protected abstract FlinkDeploymentConfig getDeploymentConfig();

    protected abstract void appendExecutionConfig(DataStream<Event> ... var1);
}

