/*
 * Decompiled with CFR 0.152.
 */
package org.glassfish.grizzly.nio;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.AbstractWriter;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Context;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.IOEvent;
import org.glassfish.grizzly.PendingWriteQueueLimitExceededException;
import org.glassfish.grizzly.WriteHandler;
import org.glassfish.grizzly.WriteResult;
import org.glassfish.grizzly.asyncqueue.AsyncQueue;
import org.glassfish.grizzly.asyncqueue.AsyncQueueWriter;
import org.glassfish.grizzly.asyncqueue.AsyncWriteQueueRecord;
import org.glassfish.grizzly.asyncqueue.MessageCloner;
import org.glassfish.grizzly.asyncqueue.PushBackContext;
import org.glassfish.grizzly.asyncqueue.PushBackHandler;
import org.glassfish.grizzly.asyncqueue.TaskQueue;
import org.glassfish.grizzly.asyncqueue.WritableMessage;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.attributes.NullaryFunction;
import org.glassfish.grizzly.nio.NIOConnection;
import org.glassfish.grizzly.nio.NIOTransport;
import org.glassfish.grizzly.threadpool.WorkerThread;

public abstract class AbstractNIOAsyncQueueWriter
extends AbstractWriter<SocketAddress>
implements AsyncQueueWriter<SocketAddress> {
    private static final Logger LOGGER = Grizzly.logger(AbstractNIOAsyncQueueWriter.class);
    private final ThreadLocal<AsyncQueueWriter.Reentrant> REENTRANTS_COUNTER = new ThreadLocal<AsyncQueueWriter.Reentrant>(){

        @Override
        protected AsyncQueueWriter.Reentrant initialValue() {
            return new AsyncQueueWriter.Reentrant();
        }
    };
    protected static final int EMPTY_RECORD_SPACE_VALUE = 1;
    protected final NIOTransport transport;
    protected volatile int maxPendingBytes = -2;
    protected volatile int maxWriteReentrants = 10;
    private volatile boolean isAllowDirectWrite = true;
    private final Attribute<AsyncQueueWriter.Reentrant> reentrantsAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(AbstractNIOAsyncQueueWriter.class.getName() + this.hashCode() + ".reentrant", new NullaryFunction<AsyncQueueWriter.Reentrant>(){

        @Override
        public AsyncQueueWriter.Reentrant evaluate() {
            return new AsyncQueueWriter.Reentrant();
        }
    });

    public AbstractNIOAsyncQueueWriter(NIOTransport transport) {
        this.transport = transport;
    }

    @Override
    public boolean canWrite(Connection connection, int size) {
        int connectionMaxPendingBytes = connection.getMaxAsyncWriteQueueSize();
        if (connectionMaxPendingBytes < 0) {
            return true;
        }
        TaskQueue<AsyncWriteQueueRecord> connectionQueue = ((NIOConnection)connection).getAsyncWriteQueue();
        return connectionQueue.spaceInBytes() + size < connectionMaxPendingBytes;
    }

    @Override
    public void notifyWritePossible(Connection connection, WriteHandler writeHandler, int size) {
        ((NIOConnection)connection).getAsyncWriteQueue().notifyWritePossible(writeHandler, size);
    }

    @Override
    public void setMaxPendingBytesPerConnection(int maxPendingBytes) {
        this.maxPendingBytes = maxPendingBytes < -2 ? -2 : maxPendingBytes;
    }

    @Override
    public int getMaxPendingBytesPerConnection() {
        return this.maxPendingBytes;
    }

    @Override
    public int getMaxWriteReentrants() {
        return this.maxWriteReentrants;
    }

    @Override
    public void setMaxWriteReentrants(int maxWriteReentrants) {
        this.maxWriteReentrants = maxWriteReentrants;
    }

    public boolean isAllowDirectWrite() {
        return this.isAllowDirectWrite;
    }

    public void setAllowDirectWrite(boolean isAllowDirectWrite) {
        this.isAllowDirectWrite = isAllowDirectWrite;
    }

    @Override
    public void write(Connection connection, SocketAddress dstAddress, WritableMessage message, CompletionHandler<WriteResult<WritableMessage, SocketAddress>> completionHandler, PushBackHandler pushBackHandler) {
        this.write(connection, dstAddress, message, completionHandler, pushBackHandler, null);
    }

    @Override
    public void write(Connection connection, SocketAddress dstAddress, WritableMessage message, CompletionHandler<WriteResult<WritableMessage, SocketAddress>> completionHandler, PushBackHandler pushBackHandler, MessageCloner<WritableMessage> cloner) {
        WriteResult<WritableMessage, SocketAddress> currentResult = WriteResult.create(connection, message, dstAddress, 0);
        AsyncWriteQueueRecord queueRecord = this.createRecord(connection, message, currentResult, completionHandler, dstAddress, pushBackHandler, !message.hasRemaining() || message.isExternal());
        this.writeQueueRecord(queueRecord, cloner, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void writeQueueRecord(AsyncWriteQueueRecord queueRecord, MessageCloner<WritableMessage> cloner, PushBackContext pushBackContext) {
        NIOConnection connection = (NIOConnection)queueRecord.getConnection();
        if (connection == null) {
            AbstractNIOAsyncQueueWriter.onWriteFailure(connection, queueRecord, new IOException("Connection is null"));
            return;
        }
        if (!connection.isOpen()) {
            AbstractNIOAsyncQueueWriter.onWriteFailure(connection, queueRecord, new IOException("Connection is closed"));
            return;
        }
        TaskQueue<AsyncWriteQueueRecord> writeTaskQueue = connection.getAsyncWriteQueue();
        boolean isEmptyRecord = queueRecord.isEmptyRecord();
        WritableMessage message = queueRecord.getWritableMessage();
        int messageSize = message.remaining();
        int bytesToReserve = isEmptyRecord ? 1 : messageSize;
        int pendingBytes = writeTaskQueue.reserveSpace(bytesToReserve);
        boolean isCurrent = pendingBytes == bytesToReserve;
        queueRecord.setMomentumQueueSize(pendingBytes);
        boolean isLogFine = LOGGER.isLoggable(Level.FINEST);
        if (isLogFine) {
            AbstractNIOAsyncQueueWriter.doFineLog("AsyncQueueWriter.write connection={0} record={1} directWrite={2}", connection, queueRecord, isCurrent);
        }
        AsyncQueueWriter.Reentrant reentrants = this.getWriteReentrant();
        try {
            if (reentrants.incAndGet() >= this.maxWriteReentrants) {
                queueRecord.setMessage(AbstractNIOAsyncQueueWriter.cloneRecordIfNeeded(connection, cloner, message));
                if (isCurrent) {
                    writeTaskQueue.setCurrentElement(queueRecord);
                    connection.simulateIOEvent(IOEvent.WRITE);
                } else {
                    AbstractNIOAsyncQueueWriter.offerToTaskQueue(connection, queueRecord, writeTaskQueue);
                }
                return;
            }
            if (!this.checkQueueSize(queueRecord, pushBackContext)) {
                assert (!isCurrent);
                writeTaskQueue.getRefusedBytes().addAndGet(bytesToReserve);
                return;
            }
            if (isCurrent && this.isAllowDirectWrite) {
                boolean isQueueEmpty;
                int written = messageSize > 0 ? (int)this.write0(connection, queueRecord) : 0;
                boolean isFinished = queueRecord.isFinished();
                int bytesToRelease = !isEmptyRecord ? written : (isFinished ? 1 : 0);
                boolean bl = isQueueEmpty = writeTaskQueue.releaseSpaceAndNotify(bytesToRelease) == 0;
                if (isFinished) {
                    queueRecord.notifyCompleteAndRecycle();
                    if (!isQueueEmpty) {
                        connection.simulateIOEvent(IOEvent.WRITE);
                    }
                    return;
                }
            }
            queueRecord.setMessage(AbstractNIOAsyncQueueWriter.cloneRecordIfNeeded(connection, cloner, message));
            if (isCurrent) {
                writeTaskQueue.setCurrentElement(queueRecord);
                this.onReadyToWrite(connection);
            } else {
                AbstractNIOAsyncQueueWriter.offerToTaskQueue(connection, queueRecord, writeTaskQueue);
            }
        }
        catch (IOException e) {
            if (isLogFine) {
                LOGGER.log(Level.FINEST, "AsyncQueueWriter.write exception. connection=" + connection + " record=" + queueRecord, e);
            }
            AbstractNIOAsyncQueueWriter.onWriteFailure(connection, queueRecord, e);
        }
        finally {
            reentrants.decAndGet();
        }
    }

    @Override
    public AsyncQueue.AsyncResult processAsync(Context context) {
        boolean isLogFine = LOGGER.isLoggable(Level.FINEST);
        NIOConnection nioConnection = (NIOConnection)context.getConnection();
        if (!nioConnection.isOpen()) {
            return AsyncQueue.AsyncResult.COMPLETE;
        }
        TaskQueue<AsyncWriteQueueRecord> writeTaskQueue = nioConnection.getAsyncWriteQueue();
        if (AbstractNIOAsyncQueueWriter.checkRefusedBytes(writeTaskQueue)) {
            return AsyncQueue.AsyncResult.COMPLETE;
        }
        boolean done = false;
        AsyncWriteQueueRecord queueRecord = null;
        try {
            while ((queueRecord = this.aggregate(writeTaskQueue)) != null) {
                boolean isFinished;
                int bytesToRelease;
                boolean isEmpty;
                if (isLogFine) {
                    AbstractNIOAsyncQueueWriter.doFineLog("AsyncQueueWriter.processAsync doWriteconnection={0} record={1}", nioConnection, queueRecord);
                }
                int bytesToReleaseBefore = (int)(!(isEmpty = queueRecord.isEmptyRecord()) ? queueRecord.remaining() : 1L);
                if (!queueRecord.isChecked() && !this.checkQueueSize(queueRecord, null)) {
                    bytesToRelease = bytesToReleaseBefore;
                    isFinished = true;
                    queueRecord = null;
                } else {
                    int written = queueRecord.remaining() > 0L ? (int)this.write0(nioConnection, queueRecord) : 0;
                    isFinished = queueRecord.isFinished();
                    int n = !queueRecord.isEmptyRecord() ? written : (bytesToRelease = isFinished ? 1 : 0);
                }
                if (isFinished && !context.isManualIOEventControl() && writeTaskQueue.spaceInBytes() - bytesToRelease <= 0) {
                    context.setManualIOEventControl();
                }
                boolean bl = done = writeTaskQueue.releaseSpaceAndNotify(bytesToRelease) == 0;
                if (isFinished) {
                    AbstractNIOAsyncQueueWriter.finishQueueRecord(nioConnection, queueRecord);
                    if (!done) continue;
                    return AsyncQueue.AsyncResult.COMPLETE;
                }
                queueRecord.notifyIncomplete();
                writeTaskQueue.setCurrentElement(queueRecord);
                if (isLogFine) {
                    AbstractNIOAsyncQueueWriter.doFineLog("AsyncQueueWriter.processAsync onReadyToWrite connection={0} peekRecord={1}", nioConnection, queueRecord);
                }
                return AsyncQueue.AsyncResult.INCOMPLETE;
            }
            if (!done) {
                return AsyncQueue.AsyncResult.EXPECTING_MORE;
            }
        }
        catch (IOException e) {
            if (isLogFine) {
                LOGGER.log(Level.FINEST, "AsyncQueueWriter.processAsync exception connection=" + nioConnection + " peekRecord=" + queueRecord, e);
            }
            AbstractNIOAsyncQueueWriter.onWriteFailure(nioConnection, queueRecord, e);
        }
        return AsyncQueue.AsyncResult.COMPLETE;
    }

    private static boolean checkRefusedBytes(TaskQueue<AsyncWriteQueueRecord> writeTaskQueue) {
        AtomicInteger refusedBytesCounter = writeTaskQueue.getRefusedBytes();
        int refusedBytes = refusedBytesCounter.getAndSet(0);
        return refusedBytes > 0 && writeTaskQueue.releaseSpaceAndNotify(refusedBytes) == 0;
    }

    private static void finishQueueRecord(NIOConnection nioConnection, AsyncWriteQueueRecord queueRecord) {
        boolean isLogFine = LOGGER.isLoggable(Level.FINEST);
        if (isLogFine) {
            AbstractNIOAsyncQueueWriter.doFineLog("AsyncQueueWriter.processAsync finished connection={0} record={1}", nioConnection, queueRecord);
        }
        if (queueRecord != null) {
            queueRecord.notifyCompleteAndRecycle();
        }
        if (isLogFine) {
            AbstractNIOAsyncQueueWriter.doFineLog("AsyncQueueWriter.processAsync nextRecord connection={0} nextRecord={1}", nioConnection, queueRecord);
        }
    }

    protected static void offerToTaskQueue(NIOConnection nioConnection, AsyncWriteQueueRecord queueRecord, TaskQueue<AsyncWriteQueueRecord> taskQueue) {
        taskQueue.offer(queueRecord);
        if (!nioConnection.isOpen() && taskQueue.remove(queueRecord)) {
            AbstractNIOAsyncQueueWriter.onWriteFailure(nioConnection, queueRecord, new IOException("Connection is closed"));
        }
    }

    private static WritableMessage cloneRecordIfNeeded(Connection connection, MessageCloner<WritableMessage> cloner, WritableMessage message) {
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "AsyncQueueWriter.write clone. connection={0} cloner={1}", new Object[]{connection, cloner});
        }
        return cloner == null ? message : cloner.clone(connection, message);
    }

    protected AsyncWriteQueueRecord createRecord(Connection connection, WritableMessage message, WriteResult<WritableMessage, SocketAddress> currentResult, CompletionHandler<WriteResult<WritableMessage, SocketAddress>> completionHandler, SocketAddress dstAddress, PushBackHandler pushBackHandler, boolean isEmptyRecord) {
        return AsyncWriteQueueRecord.create(connection, message, currentResult, completionHandler, dstAddress, pushBackHandler, isEmptyRecord);
    }

    @Override
    public final boolean isReady(Connection connection) {
        TaskQueue<AsyncWriteQueueRecord> connectionQueue = ((NIOConnection)connection).getAsyncWriteQueue();
        return connectionQueue != null && !connectionQueue.isEmpty();
    }

    private static void doFineLog(String msg, Object ... params) {
        LOGGER.log(Level.FINEST, msg, params);
    }

    @Override
    public void onClose(Connection connection) {
        NIOConnection nioConnection = (NIOConnection)connection;
        TaskQueue<AsyncWriteQueueRecord> writeQueue = nioConnection.getAsyncWriteQueue();
        writeQueue.onClose();
    }

    @Override
    public AsyncQueueWriter.Reentrant getWriteReentrant() {
        Thread t = Thread.currentThread();
        if (WorkerThread.class.isAssignableFrom(t.getClass())) {
            return this.reentrantsAttribute.get((WorkerThread)((Object)t));
        }
        return this.REENTRANTS_COUNTER.get();
    }

    @Override
    public boolean isMaxReentrantsReached(AsyncQueueWriter.Reentrant reentrant) {
        return reentrant.get() >= this.getMaxWriteReentrants();
    }

    @Override
    public final void close() {
    }

    protected static void onWriteFailure(Connection connection, AsyncWriteQueueRecord failedRecord, Throwable e) {
        failedRecord.notifyFailure(e);
        connection.closeSilently();
    }

    protected abstract long write0(NIOConnection var1, AsyncWriteQueueRecord var2) throws IOException;

    protected abstract void onReadyToWrite(NIOConnection var1) throws IOException;

    protected AsyncWriteQueueRecord aggregate(TaskQueue<AsyncWriteQueueRecord> connectionQueue) {
        return connectionQueue.obtainCurrentElementAndReserve();
    }

    private boolean checkQueueSize(AsyncWriteQueueRecord queueRecord, PushBackContext pushBackContext) {
        NIOConnection connection = (NIOConnection)queueRecord.getConnection();
        PushBackHandler pushBackHandler = queueRecord.getPushBackHandler();
        WritableMessage message = queueRecord.getWritableMessage();
        int bytesToReserve = (int)(queueRecord.isEmptyRecord() ? 1L : queueRecord.remaining());
        int pendingBytes = queueRecord.getMomentumQueueSize();
        queueRecord.setMomentumQueueSize(-1);
        boolean isCurrent = pendingBytes == bytesToReserve;
        int maxPendingBytesLocal = connection.getMaxAsyncWriteQueueSize();
        if (!isCurrent && maxPendingBytesLocal > 0 && pendingBytes > maxPendingBytesLocal) {
            if (pushBackHandler == null) {
                PendingWriteQueueLimitExceededException error = new PendingWriteQueueLimitExceededException("Max queued data limit exceeded: " + pendingBytes + '>' + maxPendingBytesLocal);
                queueRecord.notifyFailure(error);
            } else {
                PushBackContext pbContextLocal = pushBackContext == null ? new PushBackContextImpl(queueRecord) : pushBackContext;
                pushBackHandler.onPushBack(connection, message, pbContextLocal);
            }
            return false;
        }
        if (pushBackHandler != null) {
            pushBackHandler.onAccept(connection, message);
        }
        return true;
    }

    private final class PushBackContextImpl
    extends PushBackContext
    implements WriteHandler {
        public PushBackContextImpl(AsyncWriteQueueRecord queueRecord) {
            super(queueRecord);
        }

        @Override
        public void retryWhenPossible() {
            NIOConnection connection = (NIOConnection)this.queueRecord.getConnection();
            AbstractNIOAsyncQueueWriter.this.notifyWritePossible(connection, this, (int)this.queueRecord.remaining());
        }

        @Override
        public void retryNow() {
            this.onWritePossible();
        }

        @Override
        public void cancel() {
            this.queueRecord.notifyFailure(new CancellationException("write cancelled"));
        }

        @Override
        public void onWritePossible() {
            AbstractNIOAsyncQueueWriter.this.writeQueueRecord(this.queueRecord, null, this);
        }

        @Override
        public void onError(Throwable t) {
            this.queueRecord.notifyFailure(t);
        }
    }
}

