/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kubernetes.cluster.lock;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.readiness.Readiness;
import io.fabric8.kubernetes.client.utils.PodStatusUtil;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.camel.CamelContext;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
import org.apache.camel.component.kubernetes.cluster.lock.KubernetesClusterEventHandler;
import org.apache.camel.component.kubernetes.cluster.lock.KubernetesLeaseResourceManager;
import org.apache.camel.component.kubernetes.cluster.lock.KubernetesLockConfiguration;
import org.apache.camel.component.kubernetes.cluster.lock.LeaderInfo;
import org.apache.camel.component.kubernetes.cluster.lock.TimedLeaderNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesLeadershipController
implements Service {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeadershipController.class);
    private final CamelContext camelContext;
    private final KubernetesClient kubernetesClient;
    private final KubernetesLockConfiguration lockConfiguration;
    private final KubernetesClusterEventHandler eventHandler;
    private State currentState = State.NOT_LEADER;
    private ScheduledExecutorService serializedExecutor;
    private TimedLeaderNotifier leaderNotifier;
    private final KubernetesLeaseResourceManager<HasMetadata> leaseManager;
    private volatile LeaderInfo latestLeaderInfo;
    private volatile HasMetadata latestLeaseResource;
    private volatile Set<String> latestMembers;
    private boolean disabled;
    private final String logPrefix;

    public KubernetesLeadershipController(CamelContext camelContext, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) {
        this.camelContext = camelContext;
        this.kubernetesClient = kubernetesClient;
        this.lockConfiguration = lockConfiguration;
        this.eventHandler = eventHandler;
        this.disabled = false;
        this.leaseManager = KubernetesLeaseResourceManager.create(lockConfiguration.getLeaseResourceType());
        this.logPrefix = "Pod[" + this.lockConfiguration.getPodName() + "]";
    }

    public void start() {
        if (this.serializedExecutor == null) {
            LOG.debug("{} Starting leadership controller...", (Object)this.logPrefix);
            this.serializedExecutor = this.camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor((Object)this, "CamelKubernetesLeadershipController");
            this.leaderNotifier = new TimedLeaderNotifier(this.camelContext, this.eventHandler);
            this.leaderNotifier.start();
            this.serializedExecutor.execute(this::refreshStatus);
        }
    }

    public void stop() {
        LOG.debug("{} Stopping leadership controller...", (Object)this.logPrefix);
        if (this.serializedExecutor != null) {
            this.serializedExecutor.shutdownNow();
        }
        this.serializedExecutor = null;
        if (this.leaderNotifier != null) {
            this.leaderNotifier.stop();
        }
        this.leaderNotifier = null;
    }

    public boolean isDisabled() {
        return this.disabled;
    }

    public void setDisabled(boolean disabled) {
        boolean oldState = this.disabled;
        this.disabled = disabled;
        if (oldState != disabled && this.serializedExecutor != null) {
            this.serializedExecutor.execute(this::refreshStatus);
        }
    }

    private void refreshStatus() {
        switch (this.currentState.ordinal()) {
            case 0: {
                this.refreshStatusNotLeader();
                break;
            }
            case 1: {
                this.refreshStatusBecomingLeader();
                break;
            }
            case 2: {
                this.refreshStatusLeader();
                break;
            }
            case 3: {
                this.refreshStatusLosingLeadership();
                break;
            }
            case 4: {
                this.refreshStatusLeadershipLost();
                break;
            }
            default: {
                throw new RuntimeCamelException("Unsupported state " + String.valueOf((Object)this.currentState));
            }
        }
    }

    private void refreshStatusNotLeader() {
        LOG.debug("{} Pod is not leader, pulling new data from the cluster", (Object)this.logPrefix);
        boolean pulled = this.lookupNewLeaderInfo();
        if (!pulled) {
            this.rescheduleAfterDelay();
            return;
        }
        if (this.latestLeaderInfo.hasEmptyLeader()) {
            LOG.info("{} The cluster has no leaders for group {}. Trying to acquire the leadership...", (Object)this.logPrefix, (Object)this.lockConfiguration.getGroupName());
            boolean acquired = this.tryAcquireLeadership();
            if (acquired) {
                LOG.info("{} Leadership acquired by current pod with immediate effect", (Object)this.logPrefix);
                this.currentState = State.LEADER;
                this.serializedExecutor.execute(this::refreshStatus);
                return;
            }
            LOG.info("{} Unable to acquire the leadership, it may have been acquired by another pod", (Object)this.logPrefix);
        } else if (!this.latestLeaderInfo.hasValidLeader()) {
            LOG.info("{} Leadership has been lost by old owner. Trying to acquire the leadership...", (Object)this.logPrefix);
            boolean acquired = this.tryAcquireLeadership();
            if (acquired) {
                LOG.info("{} Leadership acquired by current pod", (Object)this.logPrefix);
                this.currentState = State.BECOMING_LEADER;
                this.serializedExecutor.execute(this::refreshStatus);
                return;
            }
            LOG.info("{} Unable to acquire the leadership, it may have been acquired by another pod", (Object)this.logPrefix);
        } else if (this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
            LOG.info("{} Leadership is already owned by current pod", (Object)this.logPrefix);
            this.currentState = State.BECOMING_LEADER;
            this.serializedExecutor.execute(this::refreshStatus);
            return;
        }
        this.leaderNotifier.refreshLeadership(Optional.ofNullable(this.latestLeaderInfo.getLeader()), System.currentTimeMillis(), this.lockConfiguration.getLeaseDurationMillis(), this.latestLeaderInfo.getMembers());
        this.rescheduleAfterDelay();
    }

    private void refreshStatusBecomingLeader() {
        long delay = this.lockConfiguration.getLeaseDurationMillis();
        LOG.info("{} Current pod owns the leadership, but it will be effective in {} seconds...", (Object)this.logPrefix, (Object)new BigDecimal(delay).divide(BigDecimal.valueOf(1000L), 2, RoundingMode.HALF_UP));
        try {
            Thread.sleep(delay);
        }
        catch (InterruptedException e) {
            LOG.warn("Thread interrupted", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        LOG.info("{} Current pod is becoming the new leader now...", (Object)this.logPrefix);
        this.currentState = State.LEADER;
        this.serializedExecutor.execute(this::refreshStatus);
    }

    private void refreshStatusLosingLeadership() {
        long delay = this.lockConfiguration.getLeaseDurationMillis();
        LOG.info("{} Current pod owns the leadership, but it will be lost in {} seconds...", (Object)this.logPrefix, (Object)new BigDecimal(delay).divide(BigDecimal.valueOf(1000L), 2, RoundingMode.HALF_UP));
        try {
            Thread.sleep(delay);
        }
        catch (InterruptedException e) {
            LOG.warn("Thread interrupted", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        LOG.info("{} Current pod is losing leadership now...", (Object)this.logPrefix);
        this.currentState = State.LEADERSHIP_LOST;
        this.serializedExecutor.execute(this::refreshStatus);
    }

    private void refreshStatusLeadershipLost() {
        boolean pulled = this.lookupNewLeaderInfo();
        if (!pulled) {
            this.rescheduleAfterDelay();
            return;
        }
        if (!this.yieldLeadership()) {
            this.rescheduleAfterDelay();
            return;
        }
        LOG.info("{} Current pod has lost leadership", (Object)this.logPrefix);
        this.currentState = State.NOT_LEADER;
        this.serializedExecutor.execute(this::refreshStatus);
    }

    private void refreshStatusLeader() {
        if (this.disabled) {
            LOG.debug("{} Leadership disabled, pod is going to lose leadership", (Object)this.logPrefix);
            this.currentState = State.LOSING_LEADERSHIP;
            this.serializedExecutor.execute(this::refreshStatus);
            return;
        }
        LOG.debug("{} Pod should be the leader, pulling new data from the cluster", (Object)this.logPrefix);
        long timeBeforePulling = System.currentTimeMillis();
        boolean pulled = this.lookupNewLeaderInfo();
        if (!pulled) {
            this.rescheduleAfterDelay();
            return;
        }
        if (this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
            LOG.debug("{} Current Pod is still the leader", (Object)this.logPrefix);
            this.leaderNotifier.refreshLeadership(Optional.of(this.lockConfiguration.getPodName()), timeBeforePulling, this.lockConfiguration.getRenewDeadlineMillis(), this.latestLeaderInfo.getMembers());
            HasMetadata newLease = this.leaseManager.refreshLeaseRenewTime(this.kubernetesClient, this.latestLeaseResource, this.lockConfiguration.getRenewDeadlineSeconds());
            this.updateLatestLeaderInfo(newLease, this.latestMembers);
            this.rescheduleAfterDelay();
            return;
        }
        LOG.debug("{} Current Pod has lost the leadership", (Object)this.logPrefix);
        this.currentState = State.NOT_LEADER;
        this.leaderNotifier.refreshLeadership(Optional.empty(), System.currentTimeMillis(), this.lockConfiguration.getLeaseDurationMillis(), this.latestLeaderInfo.getMembers());
        this.serializedExecutor.execute(this::refreshStatus);
    }

    private void rescheduleAfterDelay() {
        this.serializedExecutor.schedule(this::refreshStatus, this.jitter(this.lockConfiguration.getRetryPeriodMillis(), this.lockConfiguration.getJitterFactor()), TimeUnit.MILLISECONDS);
    }

    private boolean lookupNewLeaderInfo() {
        Set<String> members;
        HasMetadata leaseResource;
        LOG.debug("{} Looking up leadership information...", (Object)this.logPrefix);
        try {
            leaseResource = this.leaseManager.fetchLeaseResource(this.kubernetesClient, this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(this.kubernetesClient), this.lockConfiguration.getKubernetesResourceName(), this.lockConfiguration.getGroupName());
        }
        catch (Exception e) {
            LOG.warn("{} Unable to retrieve the current lease resource {} for group {} from Kubernetes", new Object[]{this.logPrefix, this.lockConfiguration.getKubernetesResourceName(), this.lockConfiguration.getGroupName()});
            LOG.debug("{} Exception thrown during lease resource lookup", (Object)this.logPrefix, (Object)e);
            return false;
        }
        try {
            members = Objects.requireNonNull(this.pullClusterMembers(), "Retrieved a null set of members");
        }
        catch (Exception e) {
            LOG.warn("{} Unable to retrieve the list of cluster members from Kubernetes", (Object)this.logPrefix);
            LOG.debug("{} Exception thrown during Pod list lookup", (Object)this.logPrefix, (Object)e);
            return false;
        }
        this.updateLatestLeaderInfo(leaseResource, members);
        return true;
    }

    private boolean yieldLeadership() {
        LOG.debug("{} Trying to yield the leadership...", (Object)this.logPrefix);
        HasMetadata leaseResource = this.latestLeaseResource;
        Set<String> members = this.latestMembers;
        LeaderInfo latestLeaderInfo = this.latestLeaderInfo;
        if (latestLeaderInfo == null || members == null) {
            LOG.warn("{} Unexpected condition. Latest leader info or list of members is empty.", (Object)this.logPrefix);
            return false;
        }
        if (!members.contains(this.lockConfiguration.getPodName())) {
            LOG.warn("{} The list of cluster members {} does not contain the current Pod. Cannot yield the leadership.", (Object)this.logPrefix, latestLeaderInfo.getMembers());
            return false;
        }
        if (leaseResource == null) {
            return true;
        }
        LOG.debug("{} Lock lease resource already present in the Kubernetes namespace. Checking...", (Object)this.logPrefix);
        LeaderInfo leaderInfo = this.leaseManager.decodeLeaderInfo(leaseResource, members, this.lockConfiguration.getGroupName());
        if (!leaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
            return true;
        }
        try {
            HasMetadata updatedLeaseResource = this.leaseManager.optimisticDeleteLeaderInfo(this.kubernetesClient, leaseResource, this.lockConfiguration.getGroupName());
            LOG.debug("{} Lease resource {} for group {} successfully updated", new Object[]{this.logPrefix, this.lockConfiguration.getKubernetesResourceName(), this.lockConfiguration.getGroupName()});
            this.updateLatestLeaderInfo(updatedLeaseResource, members);
            return true;
        }
        catch (Exception ex) {
            LOG.warn("{} Unable to update the lock on the lease resource to remove leadership information", (Object)this.logPrefix);
            LOG.debug("{} Error received during resource lock replace", (Object)this.logPrefix, (Object)ex);
            return false;
        }
    }

    private boolean tryAcquireLeadership() {
        boolean canAcquire;
        if (this.disabled) {
            LOG.debug("{} Won't try to acquire the leadership because it's disabled...", (Object)this.logPrefix);
            return false;
        }
        LOG.debug("{} Trying to acquire the leadership...", (Object)this.logPrefix);
        HasMetadata leaseResource = this.latestLeaseResource;
        Set<String> members = this.latestMembers;
        LeaderInfo latestLeaderInfo = this.latestLeaderInfo;
        if (latestLeaderInfo == null || members == null) {
            LOG.warn("{} Unexpected condition. Latest leader info or list of members is empty.", (Object)this.logPrefix);
            return false;
        }
        if (!members.contains(this.lockConfiguration.getPodName())) {
            LOG.warn("{} The list of cluster members {} does not contain the current Pod. Cannot acquire leadership.", (Object)this.logPrefix, latestLeaderInfo.getMembers());
            return false;
        }
        LeaderInfo newLeaderInfo = new LeaderInfo(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date(), members, this.lockConfiguration.getLeaseDurationSeconds());
        if (leaseResource == null) {
            LOG.debug("{} Lock lease resource is not present in the Kubernetes namespace. A new lease resource will be created", (Object)this.logPrefix);
            try {
                HasMetadata newLeaseResource = this.leaseManager.createNewLeaseResource(this.kubernetesClient, this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(this.kubernetesClient), this.lockConfiguration.getKubernetesResourceName(), newLeaderInfo);
                LOG.debug("{} Lease resource {} successfully created for group {}", new Object[]{this.logPrefix, this.lockConfiguration.getKubernetesResourceName(), newLeaderInfo.getGroupName()});
                this.updateLatestLeaderInfo(newLeaseResource, members);
                return true;
            }
            catch (Exception ex) {
                LOG.warn("{} Unable to create the lease resource, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right permissions to create it", (Object)this.logPrefix);
                LOG.debug("{} Exception while trying to create the lease resource", (Object)this.logPrefix, (Object)ex);
                return false;
            }
        }
        LOG.debug("{} Lock lease resource already present in the Kubernetes namespace. Checking...", (Object)this.logPrefix);
        LeaderInfo leaderInfo = this.leaseManager.decodeLeaderInfo(leaseResource, members, this.lockConfiguration.getGroupName());
        boolean bl = canAcquire = !leaderInfo.hasValidLeader();
        if (canAcquire) {
            try {
                HasMetadata updatedLeaseResource = this.leaseManager.optimisticAcquireLeadership(this.kubernetesClient, leaseResource, newLeaderInfo);
                LOG.debug("{} Lease resource {} successfully updated for group {}", new Object[]{this.logPrefix, this.lockConfiguration.getKubernetesResourceName(), newLeaderInfo.getGroupName()});
                this.updateLatestLeaderInfo(updatedLeaseResource, members);
                return true;
            }
            catch (Exception ex) {
                LOG.warn("{} Unable to update the lock lease resource to set leadership information", (Object)this.logPrefix);
                LOG.debug("{} Error received during lease resource lock replace", (Object)this.logPrefix, (Object)ex);
                return false;
            }
        }
        LOG.debug("{} Another Pod ({}) is the current leader and it is still active", (Object)this.logPrefix, (Object)this.latestLeaderInfo.getLeader());
        return false;
    }

    private void updateLatestLeaderInfo(HasMetadata leaseResource, Set<String> members) {
        LOG.debug("{} Updating internal status about the current leader", (Object)this.logPrefix);
        this.latestLeaseResource = leaseResource;
        this.latestMembers = members;
        this.latestLeaderInfo = this.leaseManager.decodeLeaderInfo(leaseResource, members, this.lockConfiguration.getGroupName());
        LOG.debug("{} Current leader info: {}", (Object)this.logPrefix, (Object)this.latestLeaderInfo);
    }

    private Set<String> pullClusterMembers() {
        List pods = ((PodList)((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(this.kubernetesClient))).withLabels(this.lockConfiguration.getClusterLabels())).list()).getItems();
        return pods.stream().filter(PodStatusUtil::isRunning).filter(Readiness::isPodReady).map(pod -> pod.getMetadata().getName()).collect(Collectors.toSet());
    }

    private long jitter(long num, double factor) {
        return (long)((double)num * (1.0 + Math.random() * (factor - 1.0)));
    }

    private static enum State {
        NOT_LEADER,
        BECOMING_LEADER,
        LEADER,
        LOSING_LEADERSHIP,
        LEADERSHIP_LOST;

    }
}

