/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.java.io;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.rmi.AccessException;
import java.rmi.AlreadyBoundException;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.RemoteCollector;
import org.apache.flink.api.java.io.RemoteCollectorConsumer;
import org.apache.flink.api.java.io.RemoteCollectorOutputFormat;
import org.apache.flink.api.java.operators.DataSink;

public class RemoteCollectorImpl<T>
extends UnicastRemoteObject
implements RemoteCollector<T> {
    private static final long serialVersionUID = 1L;
    private RemoteCollectorConsumer<T> consumer;
    private static List<Registry> registries = new ArrayList<Registry>();

    public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) {
        RemoteCollectorImpl<T> collectorInstance = null;
        try {
            collectorInstance = new RemoteCollectorImpl<T>();
            Registry registry = LocateRegistry.createRegistry(port);
            registry.bind(rmiId, collectorInstance);
            registries.add(registry);
        }
        catch (RemoteException e) {
            e.printStackTrace();
        }
        catch (AlreadyBoundException e) {
            e.printStackTrace();
        }
        collectorInstance.setConsumer(consumer);
    }

    public static <T> DataSink<T> collectLocal(DataSet<T> source, RemoteCollectorConsumer<T> consumer) {
        String ip = System.getProperty("java.rmi.server.hostname");
        if (ip == null) {
            Enumeration<NetworkInterface> networkInterfaces = null;
            try {
                networkInterfaces = NetworkInterface.getNetworkInterfaces();
            }
            catch (Throwable t) {
                throw new RuntimeException(t);
            }
            while (networkInterfaces.hasMoreElements()) {
                NetworkInterface networkInterface = networkInterfaces.nextElement();
                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
                while (inetAddresses.hasMoreElements()) {
                    InetAddress inetAddress = inetAddresses.nextElement();
                    if (inetAddress.isLoopbackAddress() || !(inetAddress instanceof Inet4Address)) continue;
                    ip = inetAddress.getHostAddress();
                    System.setProperty("java.rmi.server.hostname", ip);
                }
            }
        }
        Integer randomPort = 0;
        try {
            ServerSocket tmp = new ServerSocket(0);
            randomPort = tmp.getLocalPort();
            tmp.close();
        }
        catch (Throwable t) {
            throw new RuntimeException(t);
        }
        String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());
        RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);
        RemoteCollectorOutputFormat remoteCollectorOutputFormat = new RemoteCollectorOutputFormat(ip, randomPort, rmiId);
        return source.output(remoteCollectorOutputFormat);
    }

    public static <T> void collectLocal(DataSet<T> source, Collection<T> collection) {
        final Collection<T> synchronizedCollection = Collections.synchronizedCollection(collection);
        RemoteCollectorImpl.collectLocal(source, new RemoteCollectorConsumer<T>(){

            @Override
            public void collect(T element) {
                synchronizedCollection.add(element);
            }
        });
    }

    private RemoteCollectorImpl() throws RemoteException {
    }

    @Override
    public void collect(T element) throws RemoteException {
        this.consumer.collect(element);
    }

    @Override
    public RemoteCollectorConsumer<T> getConsumer() {
        return this.consumer;
    }

    @Override
    public void setConsumer(RemoteCollectorConsumer<T> consumer) {
        this.consumer = consumer;
    }

    public static void shutdownAll() throws AccessException, RemoteException, NotBoundException {
        for (Registry registry : registries) {
            for (String id : registry.list()) {
                Remote remote = registry.lookup(id);
                registry.unbind(id);
                UnicastRemoteObject.unexportObject(remote, true);
            }
        }
    }
}

