/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.jdbc.plugin.efm2;

import java.lang.ref.WeakReference;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.plugin.efm2.Monitor;
import software.amazon.jdbc.plugin.efm2.MonitorConnectionContext;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.PropertyUtils;
import software.amazon.jdbc.util.StringUtils;
import software.amazon.jdbc.util.telemetry.TelemetryContext;
import software.amazon.jdbc.util.telemetry.TelemetryCounter;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
import software.amazon.jdbc.util.telemetry.TelemetryGauge;
import software.amazon.jdbc.util.telemetry.TelemetryTraceLevel;

public class MonitorImpl
implements Monitor {
    private static final Logger LOGGER = Logger.getLogger(MonitorImpl.class.getName());
    private static final long THREAD_SLEEP_MILLIS = 1000L;
    private static final String MONITORING_PROPERTY_PREFIX = "monitoring-";
    protected static final Executor ABORT_EXECUTOR = Executors.newSingleThreadExecutor();
    private final Queue<WeakReference<MonitorConnectionContext>> activeContexts = new ConcurrentLinkedQueue<WeakReference<MonitorConnectionContext>>();
    private final HashMap<Long, Queue<WeakReference<MonitorConnectionContext>>> newContexts = new HashMap();
    private final PluginService pluginService;
    private final TelemetryFactory telemetryFactory;
    private final Properties properties;
    private final HostSpec hostSpec;
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private Connection monitoringConn = null;
    private final ExecutorService threadPool = Executors.newFixedThreadPool(2, runnableTarget -> {
        Thread monitoringThread = new Thread(runnableTarget);
        monitoringThread.setDaemon(true);
        return monitoringThread;
    });
    private final long failureDetectionTimeNano;
    private final long failureDetectionIntervalNano;
    private final int failureDetectionCount;
    private long invalidNodeStartTimeNano;
    private long failureCount;
    private boolean nodeUnhealthy = false;
    private final TelemetryGauge newContextsSizeGauge;
    private final TelemetryGauge activeContextsSizeGauge;
    private final TelemetryGauge nodeHealtyGauge;
    private final TelemetryCounter abortedConnectionsCounter;

    public MonitorImpl(@NonNull PluginService pluginService, @NonNull HostSpec hostSpec, @NonNull Properties properties, int failureDetectionTimeMillis, int failureDetectionIntervalMillis, int failureDetectionCount, TelemetryCounter abortedConnectionsCounter) {
        this.pluginService = pluginService;
        this.telemetryFactory = pluginService.getTelemetryFactory();
        this.hostSpec = hostSpec;
        this.properties = properties;
        this.failureDetectionTimeNano = TimeUnit.MILLISECONDS.toNanos(failureDetectionTimeMillis);
        this.failureDetectionIntervalNano = TimeUnit.MILLISECONDS.toNanos(failureDetectionIntervalMillis);
        this.failureDetectionCount = failureDetectionCount;
        this.abortedConnectionsCounter = abortedConnectionsCounter;
        String hostId = StringUtils.isNullOrEmpty(this.hostSpec.getHostId()) ? this.hostSpec.getHost() : this.hostSpec.getHostId();
        this.newContextsSizeGauge = this.telemetryFactory.createGauge(String.format("efm2.newContexts.size.%s", hostId), this::getActiveContextSize);
        this.activeContextsSizeGauge = this.telemetryFactory.createGauge(String.format("efm2.activeContexts.size.%s", hostId), () -> this.activeContexts.size());
        this.nodeHealtyGauge = this.telemetryFactory.createGauge(String.format("efm2.nodeHealthy.%s", hostId), () -> this.nodeUnhealthy ? 0L : 1L);
        this.threadPool.submit(this::newContextRun);
        this.threadPool.submit(this);
        this.threadPool.shutdown();
    }

    @Override
    public boolean canDispose() {
        return this.activeContexts.isEmpty();
    }

    @Override
    public void close() throws Exception {
        this.stopped.set(true);
        if (!this.threadPool.awaitTermination(30L, TimeUnit.SECONDS)) {
            this.threadPool.shutdownNow();
        }
        LOGGER.finest(() -> Messages.get("MonitorImpl.stopped", new Object[]{this.hostSpec.getHost()}));
    }

    protected long getActiveContextSize() {
        return this.newContexts.values().stream().mapToLong(Collection::size).sum();
    }

    @Override
    public void startMonitoring(MonitorConnectionContext context) {
        if (this.stopped.get()) {
            LOGGER.warning(() -> Messages.get("MonitorImpl.monitorIsStopped", new Object[]{this.hostSpec.getHost()}));
        }
        long currentTimeNano = this.getCurrentTimeNano();
        long startMonitoringTimeNano = this.truncateNanoToSeconds(currentTimeNano + this.failureDetectionTimeNano);
        Queue queue = this.newContexts.computeIfAbsent(startMonitoringTimeNano, key -> new ConcurrentLinkedQueue());
        queue.add(new WeakReference<MonitorConnectionContext>(context));
    }

    private long truncateNanoToSeconds(long timeNano) {
        return TimeUnit.SECONDS.toNanos(TimeUnit.NANOSECONDS.toSeconds(timeNano));
    }

    public void clearContexts() {
        this.newContexts.clear();
        this.activeContexts.clear();
    }

    long getCurrentTimeNano() {
        return System.nanoTime();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void newContextRun() {
        TelemetryContext telemetryContext = this.telemetryFactory.openTelemetryContext("monitoring thread (new contexts)", TelemetryTraceLevel.TOP_LEVEL);
        telemetryContext.setAttribute("url", this.hostSpec.getUrl());
        try {
            while (!this.stopped.get()) {
                long currentTimeNano = this.getCurrentTimeNano();
                ArrayList<Long> processedKeys = new ArrayList<Long>();
                this.newContexts.entrySet().stream().filter(entry -> (Long)entry.getKey() < currentTimeNano).forEach(entry -> {
                    WeakReference contextWeakRef;
                    Queue queue = (Queue)entry.getValue();
                    processedKeys.add((Long)entry.getKey());
                    while ((contextWeakRef = (WeakReference)queue.poll()) != null) {
                        MonitorConnectionContext context = (MonitorConnectionContext)contextWeakRef.get();
                        if (context == null || !context.isActive()) continue;
                        this.activeContexts.add(contextWeakRef);
                    }
                });
                processedKeys.forEach(this.newContexts::remove);
                TimeUnit.SECONDS.sleep(1L);
            }
        }
        catch (InterruptedException currentTimeNano) {
        }
        catch (Exception ex) {
            if (LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.log(Level.FINEST, Messages.get("MonitorImpl.exceptionDuringMonitoringStop", new Object[]{this.hostSpec.getHost()}), ex);
            }
        }
        finally {
            telemetryContext.closeContext();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        TelemetryContext telemetryContext = this.telemetryFactory.openTelemetryContext("monitoring thread", TelemetryTraceLevel.TOP_LEVEL);
        telemetryContext.setAttribute("url", this.hostSpec.getUrl());
        try {
            while (!this.stopped.get()) {
                WeakReference<MonitorConnectionContext> monitorContextWeakRef;
                if (this.activeContexts.isEmpty()) {
                    TimeUnit.MILLISECONDS.sleep(1000L);
                    continue;
                }
                long statusCheckStartTimeNano = this.getCurrentTimeNano();
                boolean isValid = this.checkConnectionStatus();
                long statusCheckEndTimeNano = this.getCurrentTimeNano();
                this.updateNodeHealthStatus(isValid, statusCheckStartTimeNano, statusCheckEndTimeNano);
                if (this.nodeUnhealthy) {
                    this.pluginService.setAvailability(this.hostSpec.asAliases(), HostAvailability.NOT_AVAILABLE);
                }
                ArrayList<WeakReference<MonitorConnectionContext>> tmpActiveContexts = new ArrayList<WeakReference<MonitorConnectionContext>>();
                while ((monitorContextWeakRef = this.activeContexts.poll()) != null && !this.stopped.get()) {
                    MonitorConnectionContext monitorContext = (MonitorConnectionContext)monitorContextWeakRef.get();
                    if (monitorContext == null) continue;
                    if (this.nodeUnhealthy) {
                        monitorContext.setNodeUnhealthy(true);
                        Connection connectionToAbort = monitorContext.getConnection();
                        monitorContext.setInactive();
                        if (connectionToAbort == null) continue;
                        this.abortConnection(connectionToAbort);
                        this.abortedConnectionsCounter.inc();
                        continue;
                    }
                    if (!monitorContext.isActive()) continue;
                    tmpActiveContexts.add(monitorContextWeakRef);
                }
                this.activeContexts.addAll(tmpActiveContexts);
                TimeUnit.NANOSECONDS.sleep(this.failureDetectionIntervalNano);
            }
        }
        catch (InterruptedException statusCheckStartTimeNano) {
        }
        catch (Exception ex) {
            if (LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.log(Level.FINEST, Messages.get("MonitorImpl.exceptionDuringMonitoringStop", new Object[]{this.hostSpec.getHost()}), ex);
            }
        }
        finally {
            this.stopped.set(true);
            if (this.monitoringConn != null) {
                try {
                    this.monitoringConn.close();
                }
                catch (SQLException sQLException) {}
            }
            telemetryContext.closeContext();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean checkConnectionStatus() {
        TelemetryContext connectContext = this.telemetryFactory.openTelemetryContext("connection status check", TelemetryTraceLevel.NESTED);
        try {
            boolean isValid;
            if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
                Properties monitoringConnProperties = PropertyUtils.copyProperties(this.properties);
                this.properties.stringPropertyNames().stream().filter(p -> p.startsWith(MONITORING_PROPERTY_PREFIX)).forEach(p -> {
                    monitoringConnProperties.put(p.substring(MONITORING_PROPERTY_PREFIX.length()), this.properties.getProperty((String)p));
                    monitoringConnProperties.remove(p);
                });
                LOGGER.finest(() -> "Opening a monitoring connection to " + this.hostSpec.getUrl());
                this.monitoringConn = this.pluginService.forceConnect(this.hostSpec, monitoringConnProperties);
                LOGGER.finest(() -> "Opened monitoring connection: " + this.monitoringConn);
                boolean bl = true;
                return bl;
            }
            boolean bl = isValid = this.monitoringConn.isValid((int)TimeUnit.NANOSECONDS.toSeconds(this.failureDetectionIntervalNano));
            return bl;
        }
        catch (SQLException sqlEx) {
            boolean bl = false;
            return bl;
        }
        finally {
            connectContext.closeContext();
        }
    }

    private void updateNodeHealthStatus(boolean connectionValid, long statusCheckStartNano, long statusCheckEndNano) {
        if (!connectionValid) {
            long maxInvalidNodeDurationNano;
            long invalidNodeDurationNano;
            ++this.failureCount;
            if (this.invalidNodeStartTimeNano == 0L) {
                this.invalidNodeStartTimeNano = statusCheckStartNano;
            }
            if ((invalidNodeDurationNano = statusCheckEndNano - this.invalidNodeStartTimeNano) >= (maxInvalidNodeDurationNano = this.failureDetectionIntervalNano * (long)Math.max(0, this.failureDetectionCount))) {
                LOGGER.fine(() -> Messages.get("MonitorConnectionContext.hostDead", new Object[]{this.hostSpec.getHost()}));
                this.nodeUnhealthy = true;
                return;
            }
            LOGGER.finest(() -> Messages.get("MonitorConnectionContext.hostNotResponding", new Object[]{this.hostSpec.getHost(), this.failureCount}));
            return;
        }
        if (this.failureCount > 0L) {
            LOGGER.finest(() -> Messages.get("MonitorConnectionContext.hostAlive", new Object[]{this.hostSpec.getHost()}));
        }
        this.failureCount = 0L;
        this.invalidNodeStartTimeNano = 0L;
        this.nodeUnhealthy = false;
    }

    private void abortConnection(@NonNull Connection connectionToAbort) {
        try {
            connectionToAbort.abort(ABORT_EXECUTOR);
            connectionToAbort.close();
        }
        catch (SQLException sqlEx) {
            LOGGER.finest(() -> Messages.get("MonitorConnectionContext.exceptionAbortingConnection", new Object[]{sqlEx.getMessage()}));
        }
    }
}

