/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.highavailability.nonha.embedded;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedLeaderService {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
    private final Object lock = new Object();
    private final Executor notificationExecutor;
    private final Set<EmbeddedLeaderElection> allLeaderContenders;
    private final Set<EmbeddedLeaderRetrievalService> listeners;
    private EmbeddedLeaderElection currentLeaderProposed;
    private EmbeddedLeaderElection currentLeaderConfirmed;
    private volatile UUID currentLeaderSessionId;
    private String currentLeaderAddress;
    private boolean shutdown;

    public EmbeddedLeaderService(Executor notificationsDispatcher) {
        this.notificationExecutor = Preconditions.checkNotNull(notificationsDispatcher);
        this.allLeaderContenders = new HashSet<EmbeddedLeaderElection>();
        this.listeners = new HashSet<EmbeddedLeaderRetrievalService>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            this.shutdownInternally(new Exception("Leader election service is shutting down"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public boolean isShutdown() {
        Object object = this.lock;
        synchronized (object) {
            return this.shutdown;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fatalError(Throwable error) {
        LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
        Object object = this.lock;
        synchronized (object) {
            this.shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
        }
    }

    @GuardedBy(value="lock")
    private void shutdownInternally(Exception exceptionForHandlers) {
        Preconditions.checkState(Thread.holdsLock(this.lock));
        if (!this.shutdown) {
            this.currentLeaderProposed = null;
            this.currentLeaderConfirmed = null;
            this.currentLeaderSessionId = null;
            this.currentLeaderAddress = null;
            for (EmbeddedLeaderElection leaderElection : this.allLeaderContenders) {
                leaderElection.shutdown(exceptionForHandlers);
            }
            this.allLeaderContenders.clear();
            for (EmbeddedLeaderRetrievalService service : this.listeners) {
                service.shutdown(exceptionForHandlers);
            }
            this.listeners.clear();
            this.shutdown = true;
        }
    }

    public LeaderElection createLeaderElectionService(String componentId) {
        Preconditions.checkState(!this.shutdown, "leader election service is shut down");
        return new EmbeddedLeaderElection(componentId);
    }

    public LeaderRetrievalService createLeaderRetrievalService() {
        Preconditions.checkState(!this.shutdown, "leader election service is shut down");
        return new EmbeddedLeaderRetrievalService();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addContender(EmbeddedLeaderElection embeddedLeaderElection, LeaderContender contender) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(!this.shutdown, "leader election is shut down");
            Preconditions.checkState(!embeddedLeaderElection.running, "leader election is already started");
            try {
                if (!this.allLeaderContenders.add(embeddedLeaderElection)) {
                    throw new IllegalStateException("leader election was added to this service multiple times");
                }
                embeddedLeaderElection.contender = contender;
                embeddedLeaderElection.running = true;
                this.updateLeader().whenComplete((aVoid, throwable) -> {
                    if (throwable != null) {
                        this.fatalError((Throwable)throwable);
                    }
                });
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) {
        Object object = this.lock;
        synchronized (object) {
            if (!embeddedLeaderElection.running || this.shutdown) {
                return;
            }
            try {
                if (!this.allLeaderContenders.remove(embeddedLeaderElection)) {
                    throw new IllegalStateException("leader election does not belong to this service");
                }
                if (embeddedLeaderElection.isLeader) {
                    embeddedLeaderElection.contender.revokeLeadership();
                }
                embeddedLeaderElection.contender = null;
                embeddedLeaderElection.running = false;
                embeddedLeaderElection.isLeader = false;
                if (this.currentLeaderConfirmed == embeddedLeaderElection) {
                    this.currentLeaderConfirmed = null;
                    this.currentLeaderSessionId = null;
                    this.currentLeaderAddress = null;
                }
                if (this.currentLeaderProposed == embeddedLeaderElection) {
                    this.currentLeaderProposed = null;
                    this.currentLeaderSessionId = null;
                }
                this.updateLeader().whenComplete((aVoid, throwable) -> {
                    if (throwable != null) {
                        this.fatalError((Throwable)throwable);
                    }
                });
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> confirmLeader(EmbeddedLeaderElection embeddedLeaderElection, UUID leaderSessionId, String leaderAddress) {
        Object object = this.lock;
        synchronized (object) {
            if (!embeddedLeaderElection.running || this.shutdown) {
                return FutureUtils.completedVoidFuture();
            }
            try {
                if (embeddedLeaderElection == this.currentLeaderProposed && this.currentLeaderSessionId.equals(leaderSessionId)) {
                    LOG.info("Received confirmation of leadership for leader {} , session={}", (Object)leaderAddress, (Object)leaderSessionId);
                    this.currentLeaderConfirmed = embeddedLeaderElection;
                    this.currentLeaderAddress = leaderAddress;
                    this.currentLeaderProposed = null;
                    return this.notifyAllListeners(leaderAddress, leaderSessionId);
                }
                LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
        return FutureUtils.completedVoidFuture();
    }

    private CompletableFuture<Void> notifyAllListeners(String address, UUID leaderSessionId) {
        ArrayList<CompletableFuture<Void>> notifyListenerFutures = new ArrayList<CompletableFuture<Void>>(this.listeners.size());
        for (EmbeddedLeaderRetrievalService listener : this.listeners) {
            notifyListenerFutures.add(this.notifyListener(address, leaderSessionId, listener.listener));
        }
        return FutureUtils.waitForAll(notifyListenerFutures);
    }

    @GuardedBy(value="lock")
    private CompletableFuture<Void> updateLeader() {
        Preconditions.checkState(Thread.holdsLock(this.lock));
        if (this.currentLeaderConfirmed == null && this.currentLeaderProposed == null) {
            if (this.allLeaderContenders.isEmpty()) {
                return this.notifyAllListeners(null, null);
            }
            UUID leaderSessionId = UUID.randomUUID();
            EmbeddedLeaderElection embeddedLeaderElection = this.allLeaderContenders.iterator().next();
            this.currentLeaderSessionId = leaderSessionId;
            this.currentLeaderProposed = embeddedLeaderElection;
            this.currentLeaderProposed.isLeader = true;
            LOG.info("Proposing leadership to the contender that is registered under component ID '{}'.", (Object)embeddedLeaderElection.componentId);
            return this.execute(new GrantLeadershipCall(embeddedLeaderElection.contender, leaderSessionId, LOG));
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> notifyListener(@Nullable String address, @Nullable UUID leaderSessionId, LeaderRetrievalListener listener) {
        return CompletableFuture.runAsync(new NotifyOfLeaderCall(address, leaderSessionId, listener, LOG), this.notificationExecutor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(!this.shutdown, "leader election service is shut down");
            Preconditions.checkState(!service.running, "leader retrieval service is already started");
            try {
                if (!this.listeners.add(service)) {
                    throw new IllegalStateException("leader retrieval service was added to this service multiple times");
                }
                service.listener = listener;
                service.running = true;
                if (this.currentLeaderConfirmed != null) {
                    this.notifyListener(this.currentLeaderAddress, this.currentLeaderSessionId, listener);
                }
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeListener(EmbeddedLeaderRetrievalService service) {
        Object object = this.lock;
        synchronized (object) {
            if (!service.running || this.shutdown) {
                return;
            }
            try {
                if (!this.listeners.remove(service)) {
                    throw new IllegalStateException("leader retrieval service does not belong to this service");
                }
                service.listener = null;
                service.running = false;
            }
            catch (Throwable t) {
                this.fatalError(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    CompletableFuture<Void> grantLeadership() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return this.getShutDownFuture();
            }
            return this.updateLeader();
        }
    }

    private CompletableFuture<Void> getShutDownFuture() {
        return FutureUtils.completedExceptionally(new FlinkException("EmbeddedLeaderService has been shut down."));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    CompletableFuture<Void> revokeLeadership() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return this.getShutDownFuture();
            }
            if (this.currentLeaderProposed != null || this.currentLeaderConfirmed != null) {
                EmbeddedLeaderElection embeddedLeaderElection = this.currentLeaderConfirmed != null ? this.currentLeaderConfirmed : this.currentLeaderProposed;
                LOG.info("Revoking leadership of {}.", (Object)embeddedLeaderElection.contender);
                embeddedLeaderElection.isLeader = false;
                CompletableFuture<Void> revokeLeadershipCallFuture = this.execute(new RevokeLeadershipCall(embeddedLeaderElection.contender));
                CompletableFuture<Void> notifyAllListenersFuture = this.notifyAllListeners(null, null);
                this.currentLeaderProposed = null;
                this.currentLeaderConfirmed = null;
                this.currentLeaderAddress = null;
                this.currentLeaderSessionId = null;
                return CompletableFuture.allOf(revokeLeadershipCallFuture, notifyAllListenersFuture);
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    private CompletableFuture<Void> execute(Runnable runnable) {
        return CompletableFuture.runAsync(runnable, this.notificationExecutor);
    }

    private static class RevokeLeadershipCall
    implements Runnable {
        @Nonnull
        private final LeaderContender contender;

        RevokeLeadershipCall(@Nonnull LeaderContender contender) {
            this.contender = contender;
        }

        @Override
        public void run() {
            this.contender.revokeLeadership();
        }
    }

    private static class GrantLeadershipCall
    implements Runnable {
        private final LeaderContender contender;
        private final UUID leaderSessionId;
        private final Logger logger;

        GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId, Logger logger) {
            this.contender = Preconditions.checkNotNull(contender);
            this.leaderSessionId = Preconditions.checkNotNull(leaderSessionId);
            this.logger = Preconditions.checkNotNull(logger);
        }

        @Override
        public void run() {
            try {
                this.contender.grantLeadership(this.leaderSessionId);
            }
            catch (Throwable t) {
                this.logger.warn("Error granting leadership to contender", t);
                this.contender.handleError(t instanceof Exception ? (Exception)t : new Exception(t));
            }
        }
    }

    private static class NotifyOfLeaderCall
    implements Runnable {
        @Nullable
        private final String address;
        @Nullable
        private final UUID leaderSessionId;
        private final LeaderRetrievalListener listener;
        private final Logger logger;

        NotifyOfLeaderCall(@Nullable String address, @Nullable UUID leaderSessionId, LeaderRetrievalListener listener, Logger logger) {
            this.address = address;
            this.leaderSessionId = leaderSessionId;
            this.listener = Preconditions.checkNotNull(listener);
            this.logger = Preconditions.checkNotNull(logger);
        }

        @Override
        public void run() {
            try {
                this.listener.notifyLeaderAddress(this.address, this.leaderSessionId);
            }
            catch (Throwable t) {
                this.logger.warn("Error notifying leader listener about new leader", t);
                this.listener.handleError(t instanceof Exception ? (Exception)t : new Exception(t));
            }
        }
    }

    private class EmbeddedLeaderRetrievalService
    implements LeaderRetrievalService {
        volatile LeaderRetrievalListener listener;
        volatile boolean running;

        private EmbeddedLeaderRetrievalService() {
        }

        @Override
        public void start(LeaderRetrievalListener listener) throws Exception {
            Preconditions.checkNotNull(listener);
            EmbeddedLeaderService.this.addListener(this, listener);
        }

        @Override
        public void stop() throws Exception {
            EmbeddedLeaderService.this.removeListener(this);
        }

        public void shutdown(Exception cause) {
            if (this.running) {
                this.running = false;
                this.listener = null;
            }
        }
    }

    private class EmbeddedLeaderElection
    implements LeaderElection {
        final String componentId;
        volatile LeaderContender contender;
        volatile boolean isLeader;
        volatile boolean running;

        EmbeddedLeaderElection(String componentId) {
            this.componentId = componentId;
        }

        @Override
        public void startLeaderElection(LeaderContender contender) throws Exception {
            Preconditions.checkNotNull(contender);
            EmbeddedLeaderService.this.addContender(this, contender);
        }

        @Override
        public void close() {
            EmbeddedLeaderService.this.removeContender(this);
        }

        @Override
        public CompletableFuture<Void> confirmLeadershipAsync(UUID leaderSessionID, String leaderAddress) {
            Preconditions.checkNotNull(leaderSessionID);
            Preconditions.checkNotNull(leaderAddress);
            return EmbeddedLeaderService.this.confirmLeader(this, leaderSessionID, leaderAddress);
        }

        @Override
        public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
            return CompletableFuture.completedFuture(this.isLeader && leaderSessionId.equals(EmbeddedLeaderService.this.currentLeaderSessionId));
        }

        void shutdown(Exception cause) {
            if (this.running) {
                this.running = false;
                this.isLeader = false;
                this.contender.revokeLeadership();
                this.contender = null;
            }
        }
    }
}

