/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.rdf4j.sail.elasticsearchstore;

import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.elasticsearchstore.ClientProvider;
import org.eclipse.rdf4j.sail.elasticsearchstore.ElasticsearchDataStructure;
import org.eclipse.rdf4j.sail.elasticsearchstore.ElasticsearchNamespaceStore;
import org.eclipse.rdf4j.sail.elasticsearchstore.ElasticsearchStoreConnection;
import org.eclipse.rdf4j.sail.elasticsearchstore.ElasticsearchValueFactory;
import org.eclipse.rdf4j.sail.elasticsearchstore.SingletonClientProvider;
import org.eclipse.rdf4j.sail.elasticsearchstore.UnclosableClientProvider;
import org.eclipse.rdf4j.sail.elasticsearchstore.UserProvidedClientProvider;
import org.eclipse.rdf4j.sail.extensiblestore.ExtensibleStore;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatementHelper;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class ElasticsearchStore
extends ExtensibleStore<ElasticsearchDataStructure, ElasticsearchNamespaceStore> {
    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchStore.class);
    final ClientProvider clientProvider;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private String hostname;
    private int port;
    private String clusterName;
    private String index;

    public ElasticsearchStore(String hostname, int port, String clusterName, String index) {
        this(hostname, port, clusterName, index, true);
    }

    public ElasticsearchStore(String hostname, int port, String clusterName, String index, boolean cacheEnabled) {
        super(cacheEnabled);
        this.hostname = hostname;
        this.port = port;
        this.clusterName = clusterName;
        this.index = index;
        this.clientProvider = new SingletonClientProvider(hostname, port, clusterName);
        this.dataStructure = new ElasticsearchDataStructure(this.clientProvider, index);
        this.namespaceStore = new ElasticsearchNamespaceStore(this.clientProvider, index + "_namespaces");
        ReferenceQueue<ElasticsearchStore> objectReferenceQueue = new ReferenceQueue<ElasticsearchStore>();
        this.startGarbageCollectionMonitoring(objectReferenceQueue, new PhantomReference<ElasticsearchStore>(this, objectReferenceQueue), this.clientProvider);
    }

    public ElasticsearchStore(ClientProvider clientPool, String index) {
        this(clientPool, index, true);
    }

    public ElasticsearchStore(ClientProvider clientPool, String index, boolean cacheEnabled) {
        super(cacheEnabled);
        this.clientProvider = new UnclosableClientProvider(clientPool);
        this.dataStructure = new ElasticsearchDataStructure(this.clientProvider, index);
        this.namespaceStore = new ElasticsearchNamespaceStore(this.clientProvider, index + "_namespaces");
    }

    public ElasticsearchStore(Client client, String index) {
        this(client, index, true);
    }

    public ElasticsearchStore(Client client, String index, boolean cacheEnabled) {
        this(new UnclosableClientProvider(new UserProvidedClientProvider(client)), index, cacheEnabled);
    }

    protected void initializeInternal() throws SailException {
        if (this.shutdown.get()) {
            throw new SailException("Can not be initialized after calling shutdown!");
        }
        this.waitForElasticsearch(10, ChronoUnit.MINUTES);
        super.initializeInternal();
    }

    protected void shutDownInternal() throws SailException {
        if (this.shutdown.compareAndSet(false, true)) {
            super.shutDownInternal();
            try {
                this.clientProvider.close();
            }
            catch (Exception e) {
                throw new SailException((Throwable)e);
            }
        }
    }

    public void waitForElasticsearch(int time, TemporalUnit timeUnit) {
        LocalDateTime tenMinFromNow = LocalDateTime.now().plus(time, timeUnit);
        logger.info("Waiting for Elasticsearch to start");
        while (true) {
            if (LocalDateTime.now().isAfter(tenMinFromNow)) {
                logger.error("Could not connect to Elasticsearch after " + time + " " + timeUnit.toString() + " of trying!");
                try {
                    this.clientProvider.close();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                throw new RuntimeException("Could not connect to Elasticsearch after " + time + " " + timeUnit.toString() + " of trying!");
            }
            try {
                Client client = this.clientProvider.getClient();
                ClusterHealthResponse clusterHealthResponse = (ClusterHealthResponse)client.admin().cluster().health(new ClusterHealthRequest()).actionGet();
                ClusterHealthStatus status = clusterHealthResponse.getStatus();
                logger.info("Cluster status: {}", (Object)status.name());
                if (status.equals((Object)ClusterHealthStatus.GREEN) || status.equals((Object)ClusterHealthStatus.YELLOW)) {
                    logger.info("Elasticsearch started!");
                    return;
                }
            }
            catch (Throwable e) {
                logger.info("Unable to connect to elasticsearch cluster due to {}", (Object)e.getClass().getSimpleName());
                try {
                    this.clientProvider.close();
                }
                catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
                e.printStackTrace();
            }
            logger.info(".");
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
            }
        }
    }

    private void startGarbageCollectionMonitoring(ReferenceQueue<ElasticsearchStore> referenceQueue, Reference<ElasticsearchStore> ref, ClientProvider clientProvider) {
        ExecutorService ex = Executors.newSingleThreadExecutor(r -> {
            Thread t = Executors.defaultThreadFactory().newThread(r);
            t.setDaemon(true);
            return t;
        });
        ex.execute(() -> {
            while (referenceQueue.poll() != ref) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                    break;
                }
            }
            if (ref.get() != null) {
                return;
            }
            if (!clientProvider.isClosed()) {
                logger.warn("Closing ClientPool in ElasticsearchStore due to store having no references and shutdown() never being called()");
            }
            try {
                clientProvider.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        ex.shutdown();
    }

    public void setElasticsearchScrollTimeout(int timeout) {
        ((ElasticsearchDataStructure)this.dataStructure).setElasticsearchScrollTimeout(timeout);
    }

    protected NotifyingSailConnection getConnectionInternal() throws SailException {
        return new ElasticsearchStoreConnection(this);
    }

    public boolean isWritable() throws SailException {
        return true;
    }

    public String getHostname() {
        return this.hostname;
    }

    public int getPort() {
        return this.port;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public String getIndex() {
        return this.index;
    }

    public void setElasticsearchBulkSize(int size) {
        ((ElasticsearchDataStructure)this.dataStructure).setElasticsearchBulkSize(size);
    }

    public ExtensibleStatementHelper getExtensibleStatementHelper() {
        return ElasticsearchValueFactory.getInstance();
    }
}

