/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor;
import org.apache.flink.runtime.checkpoint.channel.CheckpointStartRequest;
import org.apache.flink.runtime.checkpoint.channel.SubtaskID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
class ChannelStateWriteRequestExecutorImpl
implements ChannelStateWriteRequestExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequestExecutorImpl.class);
    private final Object lock = new Object();
    private final ChannelStateWriteRequestDispatcher dispatcher;
    private final Thread thread;
    private final int maxSubtasksPerChannelStateFile;
    @GuardedBy(value="lock")
    private final Deque<ChannelStateWriteRequest> deque;
    @GuardedBy(value="lock")
    private Exception thrown = null;
    @GuardedBy(value="lock")
    private boolean wasClosed = false;
    @GuardedBy(value="lock")
    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues = new HashMap<SubtaskID, Queue<ChannelStateWriteRequest>>();
    @GuardedBy(value="lock")
    private final Set<SubtaskID> subtasks;
    private final Object registerLock;
    @GuardedBy(value="registerLock")
    private boolean isRegistering = true;
    @GuardedBy(value="registerLock")
    private final Consumer<ChannelStateWriteRequestExecutor> onRegistered;

    ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher dispatcher, int maxSubtasksPerChannelStateFile, Consumer<ChannelStateWriteRequestExecutor> onRegistered, Object registerLock) {
        this(dispatcher, new ArrayDeque<ChannelStateWriteRequest>(), maxSubtasksPerChannelStateFile, registerLock, onRegistered);
    }

    ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher dispatcher, Deque<ChannelStateWriteRequest> deque, int maxSubtasksPerChannelStateFile, Object registerLock, Consumer<ChannelStateWriteRequestExecutor> onRegistered) {
        this.dispatcher = dispatcher;
        this.deque = deque;
        this.maxSubtasksPerChannelStateFile = maxSubtasksPerChannelStateFile;
        this.registerLock = registerLock;
        this.onRegistered = onRegistered;
        this.thread = new Thread(this::run, "Channel state writer ");
        this.subtasks = new HashSet<SubtaskID>();
        this.thread.setDaemon(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @VisibleForTesting
    void run() {
        block19: {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            this.loop();
            try {
                IOUtils.closeAll(this::cleanupRequests, () -> {
                    Exception cause;
                    Object object = this.lock;
                    synchronized (object) {
                        cause = this.thrown == null ? new CancellationException() : this.thrown;
                    }
                    this.dispatcher.fail(cause);
                });
            }
            catch (Exception e) {
                Object object = this.lock;
                synchronized (object) {
                    this.thrown = ExceptionUtils.firstOrSuppressed(e, this.thrown);
                }
            }
            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            break block19;
            catch (Exception ex) {
                try {
                    this.thrown = ex;
                }
                catch (Throwable throwable) {
                    try {
                        IOUtils.closeAll(this::cleanupRequests, () -> {
                            Exception cause;
                            Object object = this.lock;
                            synchronized (object) {
                                cause = this.thrown == null ? new CancellationException() : this.thrown;
                            }
                            this.dispatcher.fail(cause);
                        });
                    }
                    catch (Exception e) {
                        Object object = this.lock;
                        synchronized (object) {
                            this.thrown = ExceptionUtils.firstOrSuppressed(e, this.thrown);
                        }
                    }
                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                    throw throwable;
                }
                try {
                    IOUtils.closeAll(this::cleanupRequests, () -> {
                        Exception cause;
                        Object object = this.lock;
                        synchronized (object) {
                            cause = this.thrown == null ? new CancellationException() : this.thrown;
                        }
                        this.dispatcher.fail(cause);
                    });
                }
                catch (Exception e) {
                    Object object = this.lock;
                    synchronized (object) {
                        this.thrown = ExceptionUtils.firstOrSuppressed(e, this.thrown);
                    }
                }
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
        }
        LOG.debug("loop terminated");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loop() throws Exception {
        while (true) {
            Object object;
            try {
                while (true) {
                    ChannelStateWriteRequest request;
                    object = this.lock;
                    synchronized (object) {
                        request = this.waitAndTakeUnsafe();
                        if (request == null) {
                            return;
                        }
                    }
                    if (request instanceof CheckpointStartRequest) {
                        object = this.registerLock;
                        synchronized (object) {
                            if (this.completeRegister()) {
                                this.onRegistered.accept(this);
                            }
                        }
                    }
                    this.dispatcher.dispatch(request);
                }
            }
            catch (InterruptedException e) {
                object = this.lock;
                synchronized (object) {
                    if (this.wasClosed) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                    LOG.debug("Channel state executor is interrupted while waiting for a request (continue waiting)", (Throwable)e);
                }
            }
        }
    }

    @Nullable
    private ChannelStateWriteRequest waitAndTakeUnsafe() throws InterruptedException {
        while (!this.wasClosed) {
            ChannelStateWriteRequest request = this.deque.pollFirst();
            if (request == null) {
                this.lock.wait();
                continue;
            }
            return request;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanupRequests() throws Exception {
        ArrayList<ChannelStateWriteRequest> drained;
        Exception cause;
        Object object = this.lock;
        synchronized (object) {
            cause = this.thrown == null ? new CancellationException() : this.thrown;
            drained = new ArrayList<ChannelStateWriteRequest>(this.deque);
            this.deque.clear();
            for (Queue<ChannelStateWriteRequest> unreadyQueue : this.unreadyQueues.values()) {
                while (!unreadyQueue.isEmpty()) {
                    drained.add(unreadyQueue.poll());
                }
            }
        }
        LOG.info("discarding {} drained requests", (Object)drained.size());
        IOUtils.closeAll(drained.stream().map(request -> () -> request.cancel(cause)).collect(Collectors.toList()));
    }

    @Override
    public void start() throws IllegalStateException {
        this.thread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submit(ChannelStateWriteRequest request) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Queue<ChannelStateWriteRequest> unreadyQueue = this.unreadyQueues.get(SubtaskID.of(request.getJobVertexID(), request.getSubtaskIndex()));
            Preconditions.checkArgument(unreadyQueue != null, "The subtask %s is not yet registered.");
            this.submitInternal(request, () -> {
                if (!unreadyQueue.isEmpty()) {
                    unreadyQueue.add(request);
                    return;
                }
                if (request.getReadyFuture().isDone()) {
                    this.deque.add(request);
                    this.lock.notifyAll();
                    return;
                }
                unreadyQueue.add(request);
                this.registerFirstRequestFuture(request, unreadyQueue);
            });
        }
    }

    private void registerFirstRequestFuture(@Nonnull ChannelStateWriteRequest firstRequest, Queue<ChannelStateWriteRequest> unreadyQueue) {
        assert (Thread.holdsLock(this.lock));
        Preconditions.checkState(firstRequest == unreadyQueue.peek(), "The request isn't the first request.");
        ((CompletableFuture)firstRequest.getReadyFuture().thenAccept(o -> {
            Object object = this.lock;
            synchronized (object) {
                this.moveReadyRequestToReadyQueue(unreadyQueue, firstRequest);
            }
        })).exceptionally(throwable -> {
            Object object = this.lock;
            synchronized (object) {
                this.moveReadyRequestToReadyQueue(unreadyQueue, firstRequest);
            }
            return null;
        });
    }

    private void moveReadyRequestToReadyQueue(Queue<ChannelStateWriteRequest> unreadyQueue, ChannelStateWriteRequest firstRequest) {
        assert (Thread.holdsLock(this.lock));
        Preconditions.checkState(firstRequest == unreadyQueue.peek());
        while (!unreadyQueue.isEmpty()) {
            ChannelStateWriteRequest req = unreadyQueue.peek();
            if (!req.getReadyFuture().isDone()) {
                this.registerFirstRequestFuture(req, unreadyQueue);
                return;
            }
            this.deque.add(Objects.requireNonNull(unreadyQueue.poll()));
            this.lock.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submitPriority(ChannelStateWriteRequest request) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkArgument(this.unreadyQueues.containsKey(SubtaskID.of(request.getJobVertexID(), request.getSubtaskIndex())), "The subtask %s is not yet registered.");
            Preconditions.checkState(request.getReadyFuture().isDone(), "The priority request must be ready.");
            this.submitInternal(request, () -> {
                this.deque.addFirst(request);
                this.lock.notifyAll();
            });
        }
    }

    private void submitInternal(ChannelStateWriteRequest request, RunnableWithException action) throws Exception {
        try {
            action.run();
        }
        catch (Exception ex) {
            request.cancel(ex);
            throw ex;
        }
        this.ensureRunning();
    }

    private void ensureRunning() throws Exception {
        assert (Thread.holdsLock(this.lock));
        if (this.wasClosed || !this.thread.isAlive()) {
            this.cleanupRequests();
            IllegalStateException exception = new IllegalStateException("not running");
            if (this.thrown != null) {
                exception.addSuppressed(this.thrown);
            }
            throw exception;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerSubtask(JobVertexID jobVertexID, int subtaskIndex) {
        assert (Thread.holdsLock(this.registerLock));
        SubtaskID subtaskID = SubtaskID.of(jobVertexID, subtaskIndex);
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(this.isRegistering(), "This executor has been registered.");
            Preconditions.checkState(!this.subtasks.contains(subtaskID), String.format("This subtask[%s] has already registered.", subtaskID));
            this.subtasks.add(subtaskID);
            this.deque.add(ChannelStateWriteRequest.registerSubtask(subtaskID.getJobVertexID(), subtaskID.getSubtaskIndex()));
            this.lock.notifyAll();
            this.unreadyQueues.put(subtaskID, new ArrayDeque());
            if (this.subtasks.size() == this.maxSubtasksPerChannelStateFile && this.completeRegister()) {
                this.onRegistered.accept(this);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public boolean isRegistering() {
        Object object = this.registerLock;
        synchronized (object) {
            return this.isRegistering;
        }
    }

    private boolean completeRegister() {
        assert (Thread.holdsLock(this.registerLock));
        if (this.isRegistering) {
            this.isRegistering = false;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void releaseSubtask(JobVertexID jobVertexID, int subtaskIndex) throws IOException {
        Object object = this.registerLock;
        synchronized (object) {
            Object object2 = this.lock;
            synchronized (object2) {
                if (this.completeRegister()) {
                    this.onRegistered.accept(this);
                }
                this.subtasks.remove(SubtaskID.of(jobVertexID, subtaskIndex));
                if (!this.subtasks.isEmpty()) {
                    return;
                }
                this.wasClosed = true;
                this.lock.notifyAll();
            }
        }
        while (this.thread.isAlive()) {
            this.thread.interrupt();
            try {
                this.thread.join();
            }
            catch (InterruptedException e) {
                if (!this.thread.isAlive()) {
                    Thread.currentThread().interrupt();
                }
                LOG.debug("Channel state executor is interrupted while waiting for the writer thread to die", (Throwable)e);
            }
        }
        object = this.lock;
        synchronized (object) {
            if (this.thrown != null) {
                throw new IOException(this.thrown);
            }
        }
    }

    @VisibleForTesting
    Thread getThread() {
        return this.thread;
    }
}

