/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.flink;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class FlinkJobController {
    private static final FiniteDuration askTimeout = new FiniteDuration(120000L, TimeUnit.MILLISECONDS);
    private static final FiniteDuration lookupTimeout = new FiniteDuration(120000L, TimeUnit.MILLISECONDS);
    private final Configuration config;

    public FlinkJobController(String hostname, int port) {
        this.config = this.getConfig(hostname, port);
    }

    public ActorGateway getJobManagerGateway() throws Exception {
        ActorSystem actorSystem;
        Tuple2 systemEndpoint = new Tuple2((Object)"", (Object)0);
        try {
            actorSystem = AkkaUtils.createActorSystem((Configuration)this.config, (Option)new Some((Object)systemEndpoint));
        }
        catch (Exception e) {
            throw new IOException("Could not start actor system to communicate with JobManager", e);
        }
        LeaderRetrievalService lrs = HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)this.config, (Executor)Executors.directExecutor(), (HighAvailabilityServicesUtils.AddressResolution)HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION).getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
        return LeaderRetrievalUtils.retrieveLeaderGateway((LeaderRetrievalService)lrs, (ActorSystem)actorSystem, (FiniteDuration)lookupTimeout);
    }

    public JobID findJobId(ActorGateway jobManagerGateway, String jobName) throws Exception {
        Object result;
        Future response = jobManagerGateway.ask(JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
        try {
            result = Await.result((Awaitable)response, (Duration)askTimeout);
        }
        catch (Exception e) {
            throw new Exception("Could not retrieve running jobs from the JobManager.", e);
        }
        if (result instanceof JobManagerMessages.RunningJobsStatus) {
            List jobs = ((JobManagerMessages.RunningJobsStatus)result).getStatusMessages();
            for (JobStatusMessage rj : jobs) {
                if (!rj.getJobState().equals((Object)JobStatus.RUNNING) && !rj.getJobState().equals((Object)JobStatus.RESTARTING) || !rj.getJobName().equals(jobName)) continue;
                return rj.getJobId();
            }
        }
        throw new Exception("Could not find job");
    }

    public boolean deleteJob(JobID jobId) {
        try {
            ActorGateway jobManager = this.getJobManagerGateway();
            Future response = jobManager.ask((Object)new JobManagerMessages.CancelJob(jobId), askTimeout);
            Await.result((Awaitable)response, (Duration)askTimeout);
            return true;
        }
        catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    private final Configuration getConfig(String hostname, int port) {
        Configuration config = new Configuration();
        config.setString("jobmanager.rpc.address", hostname);
        config.setInteger("jobmanager.rpc.port", port);
        return config;
    }
}

