/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core;

import com.couchbase.client.core.BucketClosedException;
import com.couchbase.client.core.RequestEvent;
import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.ResponseHandler;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.BootstrapMessage;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.config.ConfigRequest;
import com.couchbase.client.core.message.internal.AddServiceRequest;
import com.couchbase.client.core.message.internal.RemoveServiceRequest;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.message.kv.BinaryRequest;
import com.couchbase.client.core.message.query.QueryRequest;
import com.couchbase.client.core.message.view.ViewRequest;
import com.couchbase.client.core.node.CouchbaseNode;
import com.couchbase.client.core.node.Node;
import com.couchbase.client.core.node.locate.ConfigLocator;
import com.couchbase.client.core.node.locate.KeyValueLocator;
import com.couchbase.client.core.node.locate.Locator;
import com.couchbase.client.core.node.locate.QueryLocator;
import com.couchbase.client.core.node.locate.ViewLocator;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.state.LifecycleState;
import com.couchbase.client.deps.com.lmax.disruptor.EventHandler;
import com.couchbase.client.deps.com.lmax.disruptor.RingBuffer;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

public class RequestHandler
implements EventHandler<RequestEvent> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(RequestHandler.class);
    private static final int INITIAL_NODE_SIZE = 128;
    private final Locator binaryLocator = new KeyValueLocator();
    private final Locator viewLocator = new ViewLocator();
    private final Locator queryLocator = new QueryLocator();
    private final Locator configLocator = new ConfigLocator();
    private final Set<Node> nodes;
    private final CoreEnvironment environment;
    private final AtomicReference<ClusterConfig> configuration;
    private final RingBuffer<ResponseEvent> responseBuffer;

    public RequestHandler(CoreEnvironment environment, Observable<ClusterConfig> configObservable, RingBuffer<ResponseEvent> responseBuffer) {
        this(Collections.newSetFromMap(new ConcurrentHashMap(128)), environment, configObservable, responseBuffer);
    }

    RequestHandler(Set<Node> nodes, CoreEnvironment environment, Observable<ClusterConfig> configObservable, RingBuffer<ResponseEvent> responseBuffer) {
        this.nodes = nodes;
        this.environment = environment;
        this.responseBuffer = responseBuffer;
        this.configuration = new AtomicReference();
        configObservable.subscribe((Action1)new Action1<ClusterConfig>(){

            public void call(ClusterConfig config) {
                try {
                    LOGGER.debug("Got notified of a new configuration arriving.");
                    RequestHandler.this.configuration.set(config);
                    RequestHandler.this.reconfigure(config).subscribe();
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
        ClusterConfig config;
        CouchbaseRequest request = event.getRequest();
        if (!(request instanceof BootstrapMessage) && ((config = this.configuration.get()) == null || request.bucket() != null && !config.hasBucket(request.bucket()))) {
            request.observable().onError((Throwable)new BucketClosedException(request.bucket() + " has been closed"));
            event.setRequest(null);
            return;
        }
        Node[] found = this.locator(request).locate(request, this.nodes, this.configuration.get());
        if (found == null) {
            event.setRequest(null);
            return;
        }
        if (found.length == 0) {
            this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, request, request.observable());
            event.setRequest(null);
        }
        for (int i = 0; i < found.length; ++i) {
            try {
                found[i].send(request);
                continue;
            }
            catch (Exception ex) {
                request.observable().onError((Throwable)ex);
                continue;
            }
            finally {
                event.setRequest(null);
            }
        }
        if (endOfBatch) {
            for (Node node : this.nodes) {
                node.send(SignalFlush.INSTANCE);
            }
        }
    }

    public Observable<LifecycleState> addNode(InetAddress hostname) {
        Node node = this.nodeBy(hostname);
        if (node != null) {
            LOGGER.debug("Node {} already registered, skipping.", (Object)hostname);
            return Observable.just(node.state());
        }
        return this.addNode(new CouchbaseNode(hostname, this.environment, this.responseBuffer));
    }

    Observable<LifecycleState> addNode(final Node node) {
        LOGGER.debug("Got instructed to add Node {}", (Object)node.hostname());
        if (this.nodes.contains(node)) {
            LOGGER.debug("Node {} already registered, skipping.", (Object)node.hostname());
            return Observable.just(node.state());
        }
        LOGGER.debug("Connecting Node " + node.hostname());
        return node.connect().map((Func1)new Func1<LifecycleState, LifecycleState>(){

            public LifecycleState call(LifecycleState lifecycleState) {
                LOGGER.debug("Connect finished, registering for use.");
                RequestHandler.this.nodes.add(node);
                return lifecycleState;
            }
        });
    }

    public Observable<LifecycleState> removeNode(InetAddress hostname) {
        return this.removeNode(this.nodeBy(hostname));
    }

    Observable<LifecycleState> removeNode(Node node) {
        LOGGER.debug("Got instructed to remove Node {}", (Object)node.hostname());
        this.nodes.remove(node);
        return node.disconnect();
    }

    public Observable<Service> addService(AddServiceRequest request) {
        LOGGER.debug("Got instructed to add Service {}, to Node {}", (Object)request.type(), (Object)request.hostname());
        return this.nodeBy(request.hostname()).addService(request);
    }

    public Observable<Service> removeService(RemoveServiceRequest request) {
        LOGGER.debug("Got instructed to remove Service {}, from Node {}", (Object)request.type(), (Object)request.hostname());
        return this.nodeBy(request.hostname()).removeService(request);
    }

    public Node nodeBy(InetAddress hostname) {
        if (hostname == null) {
            return null;
        }
        for (Node node : this.nodes) {
            if (!node.hostname().equals(hostname)) continue;
            return node;
        }
        return null;
    }

    protected Locator locator(CouchbaseRequest request) {
        if (request instanceof BinaryRequest) {
            return this.binaryLocator;
        }
        if (request instanceof ViewRequest) {
            return this.viewLocator;
        }
        if (request instanceof QueryRequest) {
            return this.queryLocator;
        }
        if (request instanceof ConfigRequest) {
            return this.configLocator;
        }
        throw new IllegalArgumentException("Unknown Request Type: " + request);
    }

    public Observable<ClusterConfig> reconfigure(final ClusterConfig config) {
        LOGGER.debug("Starting reconfiguration.");
        if (config.bucketConfigs().values().isEmpty()) {
            LOGGER.debug("No node found in config, disconnecting all nodes.");
            if (this.nodes.isEmpty()) {
                return Observable.just((Object)config);
            }
            return Observable.from(this.nodes).doOnNext((Action1)new Action1<Node>(){

                public void call(Node node) {
                    RequestHandler.this.removeNode(node);
                    node.disconnect().subscribe();
                }
            }).last().map((Func1)new Func1<Node, ClusterConfig>(){

                public ClusterConfig call(Node node) {
                    return config;
                }
            });
        }
        return Observable.just((Object)config).flatMap((Func1)new Func1<ClusterConfig, Observable<BucketConfig>>(){

            public Observable<BucketConfig> call(ClusterConfig clusterConfig) {
                return Observable.from(clusterConfig.bucketConfigs().values());
            }
        }).flatMap((Func1)new Func1<BucketConfig, Observable<Boolean>>(){

            public Observable<Boolean> call(BucketConfig bucketConfig) {
                return RequestHandler.this.reconfigureBucket(bucketConfig);
            }
        }).last().doOnNext((Action1)new Action1<Boolean>(){

            public void call(Boolean aBoolean) {
                HashSet<InetAddress> configNodes = new HashSet<InetAddress>();
                for (Map.Entry<String, BucketConfig> bucket : config.bucketConfigs().entrySet()) {
                    for (NodeInfo node : bucket.getValue().nodes()) {
                        configNodes.add(node.hostname());
                    }
                }
                LOGGER.debug("Found nodes {} to be removed after reconfiguration.", (Object)RequestHandler.this.nodes);
                for (Node node : RequestHandler.this.nodes) {
                    if (configNodes.contains(node.hostname())) continue;
                    RequestHandler.this.removeNode(node);
                    node.disconnect().subscribe();
                }
            }
        }).map((Func1)new Func1<Boolean, ClusterConfig>(){

            public ClusterConfig call(Boolean aBoolean) {
                return config;
            }
        });
    }

    private Observable<Boolean> reconfigureBucket(final BucketConfig config) {
        LOGGER.debug("Starting reconfiguration for bucket {}", (Object)config.name());
        ArrayList<Observable> observables = new ArrayList<Observable>();
        for (final NodeInfo nodeInfo : config.nodes()) {
            Observable obs = this.addNode(nodeInfo.hostname()).flatMap((Func1)new Func1<LifecycleState, Observable<Map<ServiceType, Integer>>>(){

                public Observable<Map<ServiceType, Integer>> call(LifecycleState lifecycleState) {
                    Map<ServiceType, Integer> services;
                    Map<ServiceType, Integer> map = services = RequestHandler.this.environment.sslEnabled() ? nodeInfo.sslServices() : nodeInfo.services();
                    if (!services.containsKey((Object)ServiceType.QUERY) && RequestHandler.this.environment.queryEnabled()) {
                        services.put(ServiceType.QUERY, RequestHandler.this.environment.queryPort());
                    }
                    return Observable.just(services);
                }
            }).flatMap((Func1)new Func1<Map<ServiceType, Integer>, Observable<AddServiceRequest>>(){

                public Observable<AddServiceRequest> call(Map<ServiceType, Integer> services) {
                    ArrayList<AddServiceRequest> requests = new ArrayList<AddServiceRequest>(services.size());
                    for (Map.Entry<ServiceType, Integer> service : services.entrySet()) {
                        requests.add(new AddServiceRequest(service.getKey(), config.name(), config.password(), service.getValue(), nodeInfo.hostname()));
                    }
                    return Observable.from(requests);
                }
            }).flatMap((Func1)new Func1<AddServiceRequest, Observable<Service>>(){

                public Observable<Service> call(AddServiceRequest request) {
                    return RequestHandler.this.addService(request);
                }
            }).last().map((Func1)new Func1<Service, Boolean>(){

                public Boolean call(Service service) {
                    return true;
                }
            });
            observables.add(obs);
        }
        return Observable.merge(observables).last();
    }
}

