/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationException;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.config.refresher.AbstractRefresher;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.config.BucketConfigRequest;
import com.couchbase.client.core.message.config.BucketConfigResponse;
import com.couchbase.client.core.message.config.BucketStreamingRequest;
import com.couchbase.client.core.message.config.BucketStreamingResponse;
import com.couchbase.client.core.service.ServiceType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

public class HttpRefresher
extends AbstractRefresher {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(HttpRefresher.class);
    private static final String TERSE_PATH = "/pools/default/bs/";
    private static final String VERBOSE_PATH = "/pools/default/bucketsStreaming/";
    private final CoreEnvironment environment;
    private final AtomicLong nodeOffset = new AtomicLong(0L);
    private final Map<String, Long> lastPollTimestamps = new ConcurrentHashMap<String, Long>();
    private final long pollFloorNs;

    public HttpRefresher(CoreEnvironment env, ClusterFacade cluster) {
        super(env, cluster);
        this.environment = env;
        this.pollFloorNs = TimeUnit.MILLISECONDS.toNanos(this.environment.configPollFloorInterval());
    }

    @Override
    public Observable<Boolean> registerBucket(String name, String password) {
        return this.registerBucket(name, name, password);
    }

    @Override
    public Observable<Boolean> registerBucket(final String name, final String username, final String password) {
        Observable response = super.registerBucket(name, username, password).flatMap((Func1)new Func1<Boolean, Observable<BucketStreamingResponse>>(){

            public Observable<BucketStreamingResponse> call(Boolean aBoolean) {
                return HttpRefresher.this.cluster().send(new BucketStreamingRequest(HttpRefresher.TERSE_PATH, name, username, password)).doOnNext((Action1)new Action1<BucketStreamingResponse>(){

                    public void call(BucketStreamingResponse response) {
                        if (response.status() == ResponseStatus.NOT_EXISTS) {
                            throw new TerseConfigDoesNotExistException();
                        }
                        if (!response.status().isSuccess()) {
                            throw new ConfigurationException("Could not load terse config.");
                        }
                    }
                });
            }
        }).onErrorResumeNext((Func1)new Func1<Throwable, Observable<BucketStreamingResponse>>(){

            public Observable<BucketStreamingResponse> call(Throwable throwable) {
                if (throwable instanceof TerseConfigDoesNotExistException) {
                    return HttpRefresher.this.cluster().send(new BucketStreamingRequest(HttpRefresher.VERBOSE_PATH, name, username, password)).doOnNext((Action1)new Action1<BucketStreamingResponse>(){

                        public void call(BucketStreamingResponse response) {
                            if (!response.status().isSuccess()) {
                                throw new ConfigurationException("Could not load terse config.");
                            }
                        }
                    });
                }
                return Observable.error((Throwable)throwable);
            }
        });
        this.repeatConfigUntilUnsubscribed(name, (Observable<BucketStreamingResponse>)response);
        return response.map((Func1)new Func1<BucketStreamingResponse, Boolean>(){

            public Boolean call(BucketStreamingResponse response) {
                return response.status().isSuccess();
            }
        });
    }

    private void repeatConfigUntilUnsubscribed(final String name, Observable<BucketStreamingResponse> response) {
        response.flatMap((Func1)new Func1<BucketStreamingResponse, Observable<ProposedBucketConfigContext>>(){

            public Observable<ProposedBucketConfigContext> call(final BucketStreamingResponse response) {
                LOGGER.debug("Config stream started for {} on {}.", (Object)name, (Object)response.host());
                return response.configs().map((Func1)new Func1<String, ProposedBucketConfigContext>(){

                    public ProposedBucketConfigContext call(String s) {
                        String raw = s.replace("$HOST", response.host());
                        return new ProposedBucketConfigContext(name, raw, response.host());
                    }
                }).doOnCompleted(new Action0(){

                    public void call() {
                        LOGGER.debug("Config stream ended for {} on {}.", (Object)name, (Object)response.host());
                    }
                });
            }
        }).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>(){

            public Observable<?> call(Observable<? extends Void> observable) {
                return observable.flatMap(new Func1<Void, Observable<?>>(){

                    public Observable<?> call(Void aVoid) {
                        if (HttpRefresher.this.registrations().containsKey(name)) {
                            LOGGER.debug("Resubscribing config stream for bucket {}, still registered.", (Object)name);
                            return Observable.just((Object)true);
                        }
                        LOGGER.debug("Not resubscribing config stream for bucket {}, not registered.", (Object)name);
                        return Observable.empty();
                    }
                });
            }
        }).subscribe((Subscriber)new Subscriber<ProposedBucketConfigContext>(){

            public void onCompleted() {
            }

            public void onError(Throwable e) {
                LOGGER.error("Error while subscribing to Http refresh stream!", e);
            }

            public void onNext(ProposedBucketConfigContext ctx) {
                HttpRefresher.this.pushConfig(ctx);
            }
        });
    }

    @Override
    public Observable<Boolean> shutdown() {
        return Observable.just((Object)true);
    }

    @Override
    public void markTainted(BucketConfig config) {
    }

    @Override
    public void markUntainted(BucketConfig config) {
    }

    @Override
    public void refresh(ClusterConfig config) {
        Observable.from(config.bucketConfigs().values()).observeOn(this.environment.scheduler()).filter((Func1)new Func1<BucketConfig, Boolean>(){

            public Boolean call(BucketConfig config) {
                return HttpRefresher.this.registrations().containsKey(config.name());
            }
        }).filter((Func1)new Func1<BucketConfig, Boolean>(){

            public Boolean call(BucketConfig config) {
                String bucketName = config.name();
                boolean allowed = HttpRefresher.this.allowedToPoll(bucketName);
                if (allowed) {
                    HttpRefresher.this.lastPollTimestamps.put(bucketName, System.nanoTime());
                } else {
                    LOGGER.trace("Ignoring refresh polling attempt because poll interval is too small.");
                }
                return allowed;
            }
        }).subscribe((Action1)new Action1<BucketConfig>(){

            public void call(BucketConfig config) {
                final String bucketName = config.name();
                ArrayList<NodeInfo> nodeInfos = new ArrayList<NodeInfo>(config.nodes());
                if (nodeInfos.isEmpty()) {
                    LOGGER.debug("Cannot refresh bucket, because node list contains no nodes.");
                    return;
                }
                HttpRefresher.this.shiftNodeList(nodeInfos);
                Observable refreshSequence = HttpRefresher.this.buildRefreshFallbackSequence(nodeInfos, bucketName);
                refreshSequence.subscribe((Subscriber)new Subscriber<ProposedBucketConfigContext>(){

                    public void onCompleted() {
                        LOGGER.debug("Completed refreshing config for bucket \"{}\"", (Object)bucketName);
                    }

                    public void onError(Throwable e) {
                        LOGGER.debug("Error while refreshing bucket config, ignoring.", e);
                    }

                    public void onNext(ProposedBucketConfigContext ctx) {
                        if (ctx.config().startsWith("{")) {
                            HttpRefresher.this.provider().proposeBucketConfig(ctx);
                        }
                    }
                });
            }
        });
    }

    private Observable<ProposedBucketConfigContext> buildRefreshFallbackSequence(List<NodeInfo> nodeInfos, String bucketName) {
        Observable failbackSequence = null;
        for (NodeInfo nodeInfo : nodeInfos) {
            if (!HttpRefresher.isValidConfigNode(this.environment.sslEnabled(), nodeInfo)) continue;
            if (failbackSequence == null) {
                failbackSequence = this.refreshAgainstNode(bucketName, nodeInfo.hostname());
                continue;
            }
            failbackSequence = failbackSequence.onErrorResumeNext(this.refreshAgainstNode(bucketName, nodeInfo.hostname()));
        }
        if (failbackSequence == null) {
            LOGGER.debug("Could not build refresh sequence, node list is empty - ignoring attempt.");
            return Observable.empty();
        }
        return failbackSequence;
    }

    private Observable<ProposedBucketConfigContext> refreshAgainstNode(final String bucketName, final String hostname) {
        final AbstractRefresher.Credential credential = this.registrations().get(bucketName);
        if (credential == null) {
            LOGGER.debug("Ignoring refresh attempt since it seems the bucket registration is gone (closed).");
            return Observable.empty();
        }
        return Observable.defer((Func0)new Func0<Observable<BucketConfigResponse>>(){

            public Observable<BucketConfigResponse> call() {
                return HttpRefresher.this.cluster().send(new BucketConfigRequest("/pools/default/b/", hostname, bucketName, credential.username(), credential.password())).flatMap((Func1)new Func1<BucketConfigResponse, Observable<BucketConfigResponse>>(){

                    public Observable<BucketConfigResponse> call(BucketConfigResponse response) {
                        if (response.status().isSuccess()) {
                            LOGGER.debug("Successfully got config refresh from terse bucket remote.");
                            return Observable.just((Object)response);
                        }
                        LOGGER.debug("Terse bucket config refresh failed, falling back to verbose.");
                        return HttpRefresher.this.cluster().send(new BucketConfigRequest("/pools/default/buckets/", hostname, bucketName, credential.username(), credential.password()));
                    }
                });
            }
        }).map((Func1)new Func1<BucketConfigResponse, ProposedBucketConfigContext>(){

            public ProposedBucketConfigContext call(BucketConfigResponse response) {
                String config = response.config().replace("$HOST", hostname);
                return new ProposedBucketConfigContext(bucketName, config, hostname);
            }
        }).doOnError((Action1)new Action1<Throwable>(){

            public void call(Throwable ex) {
                LOGGER.debug("Could not fetch config from bucket \"" + bucketName + "\" against \"" + hostname + "\".", ex);
            }
        });
    }

    private static boolean isValidConfigNode(boolean sslEnabled, NodeInfo nodeInfo) {
        if (sslEnabled && nodeInfo.sslServices().containsKey((Object)ServiceType.CONFIG)) {
            return true;
        }
        return nodeInfo.services().containsKey((Object)ServiceType.CONFIG);
    }

    private <T> void shiftNodeList(List<T> nodeList) {
        int shiftBy = (int)(this.nodeOffset.getAndIncrement() % (long)nodeList.size());
        for (int i = 0; i < shiftBy; ++i) {
            T element = nodeList.remove(0);
            nodeList.add(element);
        }
    }

    private boolean allowedToPoll(String bucket) {
        Long bucketLastPollTimestamp = this.lastPollTimestamps.get(bucket);
        return bucketLastPollTimestamp == null || System.nanoTime() - bucketLastPollTimestamp >= this.pollFloorNs;
    }

    class TerseConfigDoesNotExistException
    extends ConfigurationException {
        TerseConfigDoesNotExistException() {
        }
    }
}

