/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;

@ThreadSafe
public class TaskMailboxImpl
implements TaskMailbox {
    private final ReentrantLock lock = new ReentrantLock();
    @GuardedBy(value="lock")
    private final Deque<Mail> queue = new ArrayDeque<Mail>();
    @GuardedBy(value="lock")
    private final Condition notEmpty = this.lock.newCondition();
    @GuardedBy(value="lock")
    private TaskMailbox.State state = TaskMailbox.State.OPEN;
    @Nonnull
    private final Thread taskMailboxThread;
    private final Deque<Mail> batch = new ArrayDeque<Mail>();
    private volatile boolean hasNewMail = false;
    private volatile boolean hasNewUrgentMail = false;

    public TaskMailboxImpl(@Nonnull Thread taskMailboxThread) {
        this.taskMailboxThread = taskMailboxThread;
    }

    @VisibleForTesting
    public TaskMailboxImpl() {
        this(Thread.currentThread());
    }

    @Override
    public boolean isMailboxThread() {
        return Thread.currentThread() == this.taskMailboxThread;
    }

    @Override
    public boolean hasMail() {
        this.checkIsMailboxThread();
        return !this.batch.isEmpty() || this.hasNewMail;
    }

    @Override
    public int size() {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = this.batch.size() + this.queue.size();
            return n;
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<Mail> tryTake(int priority) {
        this.checkIsMailboxThread();
        this.checkTakeStateConditions();
        this.moveUrgentMailsToBatchIfNeeded(true);
        Mail head = this.takeOrNull(this.batch, priority);
        if (head != null) {
            return Optional.of(head);
        }
        if (!this.hasNewMail) {
            return Optional.empty();
        }
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Mail value = this.takeOrNull(this.queue, priority);
            if (value == null) {
                Optional<Mail> optional = Optional.empty();
                return optional;
            }
            this.updateNewMailFlags();
            Optional<Mail> optional = Optional.ofNullable(value);
            return optional;
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nonnull
    public Mail take(int priority) throws InterruptedException, IllegalStateException {
        this.checkIsMailboxThread();
        this.checkTakeStateConditions();
        this.moveUrgentMailsToBatchIfNeeded(true);
        Mail head = this.takeOrNull(this.batch, priority);
        if (head != null) {
            return head;
        }
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            Mail headMail;
            while ((headMail = this.takeOrNull(this.queue, priority)) == null) {
                this.notEmpty.await(1L, TimeUnit.SECONDS);
            }
            this.updateNewMailFlags();
            Mail mail = headMail;
            return mail;
        }
        finally {
            lock.unlock();
        }
    }

    private void updateNewMailFlags() {
        Mail peek = this.queue.peek();
        if (peek != null) {
            this.hasNewMail = true;
            this.hasNewUrgentMail = peek.getMailOptions().isUrgent();
        } else {
            this.hasNewMail = false;
            this.hasNewUrgentMail = false;
        }
    }

    @Override
    public boolean createBatch() {
        this.moveUrgentMailsToBatchIfNeeded(false);
        return !this.batch.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void moveUrgentMailsToBatchIfNeeded(boolean onlyMoveUrgentMails) {
        this.checkIsMailboxThread();
        Mail peek = this.batch.peek();
        if (peek != null) {
            if (peek.getMailOptions().isUrgent()) {
                return;
            }
            if (!this.hasNewUrgentMail) {
                return;
            }
        } else {
            if (onlyMoveUrgentMails && !this.hasNewUrgentMail) {
                return;
            }
            if (!this.hasNewMail) {
                return;
            }
        }
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Mail mail;
            while ((mail = this.queue.pollFirst()) != null) {
                if (mail.getMailOptions().isUrgent()) {
                    this.batch.addFirst(mail);
                    continue;
                }
                if (onlyMoveUrgentMails) {
                    this.queue.addFirst(mail);
                    break;
                }
                this.batch.addLast(mail);
            }
            this.hasNewUrgentMail = false;
            this.hasNewMail = !this.queue.isEmpty();
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public Optional<Mail> tryTakeFromBatch() {
        this.checkIsMailboxThread();
        this.checkTakeStateConditions();
        this.moveUrgentMailsToBatchIfNeeded(true);
        return Optional.ofNullable(this.batch.pollFirst());
    }

    @Override
    public void put(@Nonnull Mail mail) {
        if (mail.getMailOptions().isUrgent()) {
            this.putFirst(mail);
        } else {
            this.putLast(mail);
        }
    }

    private void putLast(@Nonnull Mail mail) {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            this.checkPutStateConditions();
            this.queue.addLast(mail);
            this.hasNewMail = true;
            this.notEmpty.signal();
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putFirst(@Nonnull Mail mail) {
        Mail peek = this.batch.peek();
        if (this.isMailboxThread() && peek != null && !peek.getMailOptions().isUrgent() && !this.hasNewUrgentMail) {
            this.checkPutStateConditions();
            this.batch.addFirst(mail);
        } else {
            ReentrantLock lock = this.lock;
            lock.lock();
            try {
                this.checkPutStateConditions();
                this.queue.addFirst(mail);
                this.hasNewMail = true;
                this.hasNewUrgentMail = true;
                this.notEmpty.signal();
            }
            finally {
                lock.unlock();
            }
        }
    }

    @Nullable
    private Mail takeOrNull(Deque<Mail> queue, int priority) {
        if (queue.isEmpty()) {
            return null;
        }
        Iterator<Mail> iterator = queue.iterator();
        while (iterator.hasNext()) {
            Mail mail = iterator.next();
            if (mail.getPriority() < priority) continue;
            iterator.remove();
            return mail;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Mail> drain() {
        ArrayList<Mail> drainedMails = new ArrayList<Mail>(this.batch);
        this.batch.clear();
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            drainedMails.addAll(this.queue);
            this.queue.clear();
            this.hasNewUrgentMail = false;
            this.hasNewMail = false;
            ArrayList<Mail> arrayList = drainedMails;
            return arrayList;
        }
        finally {
            lock.unlock();
        }
    }

    private void checkIsMailboxThread() {
        if (!this.isMailboxThread()) {
            throw new IllegalStateException("Illegal thread detected. This method must be called from inside the mailbox thread!");
        }
    }

    private void checkPutStateConditions() {
        if (this.state != TaskMailbox.State.OPEN) {
            throw new TaskMailbox.MailboxClosedException("Mailbox is in state " + String.valueOf((Object)this.state) + ", but is required to be in state " + String.valueOf((Object)TaskMailbox.State.OPEN) + " for put operations.");
        }
    }

    private void checkTakeStateConditions() {
        if (this.state == TaskMailbox.State.CLOSED) {
            throw new TaskMailbox.MailboxClosedException("Mailbox is in state " + String.valueOf((Object)this.state) + ", but is required to be in state " + String.valueOf((Object)TaskMailbox.State.OPEN) + " or " + String.valueOf((Object)TaskMailbox.State.QUIESCED) + " for take operations.");
        }
    }

    @Override
    public void quiesce() {
        this.checkIsMailboxThread();
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (this.state == TaskMailbox.State.OPEN) {
                this.state = TaskMailbox.State.QUIESCED;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nonnull
    public List<Mail> close() {
        this.checkIsMailboxThread();
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (this.state == TaskMailbox.State.CLOSED) {
                List<Mail> list = Collections.emptyList();
                return list;
            }
            List<Mail> droppedMails = this.drain();
            this.state = TaskMailbox.State.CLOSED;
            this.notEmpty.signalAll();
            List<Mail> list = droppedMails;
            return list;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    @Nonnull
    public TaskMailbox.State getState() {
        if (this.isMailboxThread()) {
            return this.state;
        }
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            TaskMailbox.State state = this.state;
            return state;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void runExclusively(Runnable runnable) {
        this.lock.lock();
        try {
            runnable.run();
        }
        finally {
            this.lock.unlock();
        }
    }
}

