/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.connection;

import com.mongodb.MongoException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoSocketClosedException;
import com.mongodb.MongoSocketReadException;
import com.mongodb.MongoSocketReadTimeoutException;
import com.mongodb.MongoSocketWriteException;
import com.mongodb.ServerAddress;
import com.mongodb.assertions.Assertions;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.connection.AsyncCompletionHandler;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ConnectionId;
import com.mongodb.connection.InternalConnection;
import com.mongodb.connection.InternalConnectionInitializer;
import com.mongodb.connection.ReplyHeader;
import com.mongodb.connection.ResponseBuffers;
import com.mongodb.connection.ServerId;
import com.mongodb.connection.Stream;
import com.mongodb.connection.StreamFactory;
import com.mongodb.diagnostics.logging.Logger;
import com.mongodb.diagnostics.logging.Loggers;
import com.mongodb.event.ConnectionEvent;
import com.mongodb.event.ConnectionListener;
import com.mongodb.event.ConnectionMessageReceivedEvent;
import com.mongodb.event.ConnectionMessagesSentEvent;
import com.mongodb.internal.async.ErrorHandlingResultCallback;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import org.bson.ByteBuf;
import org.bson.io.BsonInput;
import org.bson.io.ByteBufferBsonInput;

class InternalStreamConnection
implements InternalConnection {
    private final ServerId serverId;
    private final StreamFactory streamFactory;
    private final InternalConnectionInitializer connectionInitializer;
    private final ConnectionListener connectionListener;
    private final LinkedList<SendMessageAsync> writeQueue = new LinkedList();
    private final ConcurrentHashMap<Integer, SingleResultCallback<ResponseBuffers>> readQueue = new ConcurrentHashMap();
    private final ConcurrentMap<Integer, Response> messages = new ConcurrentHashMap<Integer, Response>();
    private final Semaphore writing = new Semaphore(1);
    private final Semaphore reading = new Semaphore(1);
    private volatile ConnectionDescription description;
    private volatile Stream stream;
    private volatile boolean isClosed;
    private volatile boolean opened;
    static final Logger LOGGER = Loggers.getLogger("connection");

    InternalStreamConnection(ServerId serverId, StreamFactory streamFactory, InternalConnectionInitializer connectionInitializer, ConnectionListener connectionListener) {
        this.serverId = Assertions.notNull("serverId", serverId);
        this.streamFactory = Assertions.notNull("streamFactory", streamFactory);
        this.connectionInitializer = Assertions.notNull("connectionInitializer", connectionInitializer);
        this.connectionListener = Assertions.notNull("connectionListener", connectionListener);
        this.description = new ConnectionDescription(serverId);
    }

    @Override
    public ConnectionDescription getDescription() {
        return this.description;
    }

    @Override
    public void open() {
        Assertions.isTrue("Open already called", this.stream == null);
        this.stream = this.streamFactory.create(this.serverId.getAddress());
        try {
            this.stream.open();
            this.description = this.connectionInitializer.initialize(this);
            LOGGER.info(String.format("Opened connection [%s] to %s", this.getId(), this.serverId.getAddress()));
            try {
                this.connectionListener.connectionOpened(new ConnectionEvent(this.getId()));
            }
            catch (Throwable t) {
                LOGGER.warn("Exception when trying to signal connectionOpened to the connectionListener", t);
            }
            this.opened = true;
        }
        catch (Throwable t) {
            this.close();
            if (t instanceof MongoException) {
                throw (MongoException)t;
            }
            throw new MongoException(t.toString(), t);
        }
    }

    @Override
    public void openAsync(final SingleResultCallback<Void> callback) {
        Assertions.isTrue("Open already called", this.stream == null);
        this.stream = this.streamFactory.create(this.serverId.getAddress());
        this.stream.openAsync(new AsyncCompletionHandler<Void>(){

            @Override
            public void completed(Void aVoid) {
                InternalStreamConnection.this.connectionInitializer.initializeAsync(InternalStreamConnection.this, new SingleResultCallback<ConnectionDescription>(){

                    @Override
                    public void onResult(ConnectionDescription result, Throwable t) {
                        if (t != null) {
                            InternalStreamConnection.this.close();
                            callback.onResult(null, t);
                        } else {
                            InternalStreamConnection.this.description = result;
                            callback.onResult(null, null);
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info(String.format("Opened connection [%s] to %s", InternalStreamConnection.this.getId(), InternalStreamConnection.this.serverId.getAddress()));
                            }
                            try {
                                InternalStreamConnection.this.connectionListener.connectionOpened(new ConnectionEvent(InternalStreamConnection.this.getId()));
                            }
                            catch (Throwable tr) {
                                LOGGER.warn("Exception when trying to signal connectionOpened to the connectionListener", tr);
                            }
                            InternalStreamConnection.this.opened = true;
                        }
                    }
                });
            }

            @Override
            public void failed(Throwable t) {
                callback.onResult(null, t);
            }
        });
    }

    @Override
    public void close() {
        if (this.stream != null) {
            this.stream.close();
        }
        this.isClosed = true;
        try {
            this.connectionListener.connectionClosed(new ConnectionEvent(this.getId()));
        }
        catch (Throwable t) {
            LOGGER.warn("Exception when trying to signal connectionClosed to the connectionListener", t);
        }
    }

    @Override
    public boolean opened() {
        return this.opened;
    }

    @Override
    public boolean isClosed() {
        return this.isClosed;
    }

    @Override
    public void sendMessage(List<ByteBuf> byteBuffers, int lastRequestId) {
        Assertions.notNull("open", this.stream);
        if (this.isClosed()) {
            throw new MongoSocketClosedException("Cannot write to a closed stream", this.getServerAddress());
        }
        try {
            this.writing.acquire();
            this.stream.write(byteBuffers);
            try {
                this.connectionListener.messagesSent(new ConnectionMessagesSentEvent(this.getId(), lastRequestId, this.getTotalRemaining(byteBuffers)));
            }
            catch (Throwable t) {
                LOGGER.warn("Exception when trying to signal messagesSent to the connectionListener", t);
            }
        }
        catch (Exception e) {
            this.close();
            throw this.translateWriteException(e);
        }
        finally {
            this.writing.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResponseBuffers receiveMessage(int responseTo) {
        Response response;
        Assertions.notNull("open", this.stream);
        if (this.isClosed()) {
            throw new MongoSocketClosedException("Cannot read from a closed stream", this.getServerAddress());
        }
        while (!this.messages.containsKey(responseTo)) {
            try {
                this.reading.acquire();
                if (this.messages.containsKey(responseTo)) break;
                ResponseBuffers responseBuffers = this.receiveResponseBuffers();
                try {
                    this.connectionListener.messageReceived(new ConnectionMessageReceivedEvent(this.getId(), responseBuffers.getReplyHeader().getResponseTo(), responseBuffers.getReplyHeader().getMessageLength()));
                }
                catch (Throwable t) {
                    LOGGER.warn("Exception when trying to signal messageReceived to the connectionListener", t);
                }
                this.messages.put(responseBuffers.getReplyHeader().getResponseTo(), new Response(responseBuffers, null));
            }
            catch (Exception e) {
                this.close();
                this.messages.put(responseTo, new Response(null, this.translateReadException(e)));
            }
            finally {
                this.reading.release();
            }
        }
        if ((response = (Response)this.messages.remove(responseTo)).hasError()) {
            Throwable t = response.getError();
            if (t instanceof MongoException) {
                throw (MongoException)t;
            }
            throw MongoException.fromThrowable(t);
        }
        return response.getResult();
    }

    @Override
    public void sendMessageAsync(List<ByteBuf> byteBuffers, int lastRequestId, SingleResultCallback<Void> callback) {
        Assertions.notNull("open", this.stream);
        this.writeQueue.add(new SendMessageAsync(byteBuffers, lastRequestId, ErrorHandlingResultCallback.errorHandlingCallback(callback, LOGGER)));
        this.processPendingWrites();
    }

    @Override
    public void receiveMessageAsync(int responseTo, SingleResultCallback<ResponseBuffers> callback) {
        Assertions.notNull("open", this.stream);
        this.readQueue.put(responseTo, ErrorHandlingResultCallback.errorHandlingCallback(callback, LOGGER));
        this.processPendingReads();
    }

    private ConnectionId getId() {
        return this.description.getConnectionId();
    }

    private ServerAddress getServerAddress() {
        return this.description.getServerAddress();
    }

    private void fillAndFlipBuffer(int numBytes, final SingleResultCallback<ByteBuf> callback) {
        Assertions.notNull("open", this.stream);
        if (this.isClosed()) {
            callback.onResult(null, new MongoSocketClosedException("Cannot read from a closed stream", this.getServerAddress()));
        } else {
            this.stream.readAsync(numBytes, new AsyncCompletionHandler<ByteBuf>(){

                @Override
                public void completed(ByteBuf buffer) {
                    callback.onResult(buffer, null);
                }

                @Override
                public void failed(Throwable t) {
                    InternalStreamConnection.this.close();
                    callback.onResult(null, InternalStreamConnection.this.translateReadException(t));
                }
            });
        }
    }

    private MongoException translateWriteException(Throwable e) {
        if (e instanceof MongoException) {
            return (MongoException)e;
        }
        if (e instanceof IOException) {
            return new MongoSocketWriteException("Exception sending message", this.getServerAddress(), e);
        }
        if (e instanceof InterruptedException) {
            return new MongoInternalException("Thread interrupted exception", e);
        }
        return new MongoInternalException("Unexpected exception", e);
    }

    private MongoException translateReadException(Throwable e) {
        if (e instanceof MongoException) {
            return (MongoException)e;
        }
        if (e instanceof SocketTimeoutException) {
            return new MongoSocketReadTimeoutException("Timeout while receiving message", this.getServerAddress(), e);
        }
        if (e instanceof InterruptedIOException) {
            return new MongoInterruptedException("Interrupted while receiving message", (InterruptedIOException)e);
        }
        if (e instanceof ClosedByInterruptException) {
            return new MongoInterruptedException("Interrupted while receiving message", (ClosedByInterruptException)e);
        }
        if (e instanceof IOException) {
            return new MongoSocketReadException("Exception receiving message", this.getServerAddress(), e);
        }
        if (e instanceof RuntimeException) {
            return new MongoInternalException("Unexpected runtime exception", e);
        }
        if (e instanceof InterruptedException) {
            return new MongoInternalException("Interrupted exception", e);
        }
        return new MongoInternalException("Unexpected exception", e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ResponseBuffers receiveResponseBuffers() throws IOException {
        ReplyHeader replyHeader;
        ByteBuf headerByteBuffer = this.stream.read(36);
        ByteBufferBsonInput headerInputBuffer = new ByteBufferBsonInput(headerByteBuffer);
        try {
            replyHeader = new ReplyHeader((BsonInput)headerInputBuffer);
        }
        finally {
            headerInputBuffer.close();
        }
        ByteBuf bodyByteBuffer = null;
        if (replyHeader.getNumberReturned() > 0) {
            bodyByteBuffer = this.stream.read(replyHeader.getMessageLength() - 36);
        }
        return new ResponseBuffers(replyHeader, bodyByteBuffer);
    }

    @Override
    public ByteBuf getBuffer(int size) {
        Assertions.notNull("open", this.stream);
        return this.stream.getBuffer(size);
    }

    private int getTotalRemaining(List<ByteBuf> byteBuffers) {
        int messageSize = 0;
        for (ByteBuf cur : byteBuffers) {
            messageSize += cur.remaining();
        }
        return messageSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPendingReads() {
        block10: {
            if (this.reading.tryAcquire()) {
                this.processPendingResults();
                if (this.readQueue.isEmpty()) {
                    this.reading.release();
                    return;
                }
                if (this.isClosed()) {
                    Iterator<Map.Entry<Integer, SingleResultCallback<ResponseBuffers>>> it = this.readQueue.entrySet().iterator();
                    block5: while (true) {
                        while (it.hasNext()) {
                            Map.Entry<Integer, SingleResultCallback<ResponseBuffers>> pairs = it.next();
                            SingleResultCallback<ResponseBuffers> callback = pairs.getValue();
                            it.remove();
                            try {
                                callback.onResult(null, new MongoSocketClosedException("Cannot read from a closed stream", this.getServerAddress()));
                                continue block5;
                            }
                            catch (Throwable t) {
                                LOGGER.warn("Exception calling callback", t);
                            }
                        }
                        break block10;
                        {
                            continue block5;
                            break;
                        }
                        break;
                    }
                    finally {
                        this.reading.release();
                    }
                }
                this.fillAndFlipBuffer(36, ErrorHandlingResultCallback.errorHandlingCallback(new ResponseHeaderCallback(new SingleResultCallback<ResponseBuffers>(){

                    @Override
                    public void onResult(ResponseBuffers result, Throwable t) {
                        if (result == null) {
                            InternalStreamConnection.this.reading.release();
                            InternalStreamConnection.this.processUnknownFailedRead(t);
                        } else {
                            InternalStreamConnection.this.reading.release();
                            InternalStreamConnection.this.messages.put(result.getReplyHeader().getResponseTo(), new Response(result, t));
                        }
                        InternalStreamConnection.this.processPendingReads();
                    }
                }), LOGGER));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPendingWrites() {
        if (this.writing.tryAcquire()) {
            if (this.writeQueue.isEmpty()) {
                this.writing.release();
                return;
            }
            if (this.isClosed()) {
                try {
                    while (!this.writeQueue.isEmpty()) {
                        SendMessageAsync message = this.writeQueue.poll();
                        ErrorHandlingResultCallback.errorHandlingCallback(message.getCallback(), LOGGER).onResult(null, new MongoSocketClosedException("Cannot write to a closed stream", this.getServerAddress()));
                    }
                }
                finally {
                    this.writing.release();
                }
            } else {
                final SendMessageAsync message = this.writeQueue.poll();
                this.stream.writeAsync(message.getByteBuffers(), new AsyncCompletionHandler<Void>(){

                    @Override
                    public void completed(Void v) {
                        InternalStreamConnection.this.writing.release();
                        try {
                            InternalStreamConnection.this.connectionListener.messagesSent(new ConnectionMessagesSentEvent(InternalStreamConnection.this.getId(), message.getMessageId(), InternalStreamConnection.this.getTotalRemaining(message.getByteBuffers())));
                        }
                        catch (Throwable t) {
                            LOGGER.warn("Exception when trying to signal messagesSent to the connectionListener", t);
                        }
                        ErrorHandlingResultCallback.errorHandlingCallback(message.getCallback(), LOGGER).onResult(null, null);
                        InternalStreamConnection.this.processPendingWrites();
                    }

                    @Override
                    public void failed(Throwable t) {
                        InternalStreamConnection.this.writing.release();
                        InternalStreamConnection.this.close();
                        ErrorHandlingResultCallback.errorHandlingCallback(message.getCallback(), LOGGER).onResult(null, InternalStreamConnection.this.translateWriteException(t));
                        InternalStreamConnection.this.processPendingWrites();
                    }
                });
            }
        }
    }

    private void processPendingResults() {
        Iterator it = this.messages.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry pairs = it.next();
            int messageId = (Integer)pairs.getKey();
            SingleResultCallback<ResponseBuffers> callback = this.readQueue.remove(messageId);
            if (callback == null) continue;
            if (((Response)pairs.getValue()).hasError()) {
                try {
                    callback.onResult(null, ((Response)pairs.getValue()).getError());
                }
                catch (Throwable t) {
                    LOGGER.warn("Exception calling callback", t);
                }
            } else {
                try {
                    callback.onResult(((Response)pairs.getValue()).getResult(), null);
                }
                catch (Throwable t) {
                    LOGGER.warn("Exception calling callback", t);
                }
            }
            it.remove();
        }
    }

    private void processUnknownFailedRead(Throwable t) {
        this.processPendingResults();
        this.close();
        Iterator<Map.Entry<Integer, SingleResultCallback<ResponseBuffers>>> it = this.readQueue.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, SingleResultCallback<ResponseBuffers>> pairs = it.next();
            SingleResultCallback<ResponseBuffers> callback = pairs.getValue();
            it.remove();
            try {
                callback.onResult(null, t);
            }
            catch (Throwable tr) {
                LOGGER.warn("Exception calling callback", tr);
            }
        }
    }

    private static class Response {
        private final ResponseBuffers result;
        private final Throwable t;

        public Response(ResponseBuffers result, Throwable t) {
            this.result = result;
            this.t = t;
        }

        public ResponseBuffers getResult() {
            return this.result;
        }

        public Throwable getError() {
            return this.t;
        }

        public boolean hasError() {
            return this.t != null;
        }
    }

    private static class SendMessageAsync
    extends SendMessage {
        private final SingleResultCallback<Void> callback;

        SendMessageAsync(List<ByteBuf> byteBuffers, int messageId, SingleResultCallback<Void> callback) {
            super(byteBuffers, messageId);
            this.callback = callback;
        }

        public SingleResultCallback<Void> getCallback() {
            return this.callback;
        }
    }

    private static class SendMessage {
        private final List<ByteBuf> byteBuffers;
        private final int messageId;

        SendMessage(List<ByteBuf> byteBuffers, int messageId) {
            this.byteBuffers = byteBuffers;
            this.messageId = messageId;
        }

        public List<ByteBuf> getByteBuffers() {
            return this.byteBuffers;
        }

        public int getMessageId() {
            return this.messageId;
        }
    }

    private class ResponseHeaderCallback
    implements SingleResultCallback<ByteBuf> {
        private final SingleResultCallback<ResponseBuffers> callback;

        public ResponseHeaderCallback(SingleResultCallback<ResponseBuffers> callback) {
            this.callback = callback;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onResult(ByteBuf result, Throwable t) {
            if (t != null) {
                this.callback.onResult(null, t);
            } else {
                ReplyHeader replyHeader;
                ByteBufferBsonInput headerInputBuffer = new ByteBufferBsonInput(result);
                try {
                    replyHeader = new ReplyHeader((BsonInput)headerInputBuffer);
                }
                finally {
                    headerInputBuffer.close();
                }
                if (replyHeader.getMessageLength() == 36) {
                    this.onSuccess(new ResponseBuffers(replyHeader, null));
                } else {
                    InternalStreamConnection.this.fillAndFlipBuffer(replyHeader.getMessageLength() - 36, new ResponseBodyCallback(replyHeader));
                }
            }
        }

        private void onSuccess(ResponseBuffers responseBuffers) {
            if (responseBuffers == null) {
                this.callback.onResult(null, new MongoException("Unexpected empty response buffers"));
                return;
            }
            try {
                InternalStreamConnection.this.connectionListener.messageReceived(new ConnectionMessageReceivedEvent(InternalStreamConnection.this.getId(), responseBuffers.getReplyHeader().getResponseTo(), responseBuffers.getReplyHeader().getMessageLength()));
            }
            catch (Throwable t) {
                LOGGER.warn("Exception when trying to signal messageReceived to the connectionListener", t);
            }
            try {
                this.callback.onResult(responseBuffers, null);
            }
            catch (Throwable t) {
                LOGGER.warn("Exception calling callback", t);
            }
        }

        private class ResponseBodyCallback
        implements SingleResultCallback<ByteBuf> {
            private final ReplyHeader replyHeader;

            public ResponseBodyCallback(ReplyHeader replyHeader) {
                this.replyHeader = replyHeader;
            }

            @Override
            public void onResult(ByteBuf result, Throwable t) {
                if (t != null) {
                    try {
                        ResponseHeaderCallback.this.callback.onResult(new ResponseBuffers(this.replyHeader, result), t);
                    }
                    catch (Throwable tr) {
                        LOGGER.warn("Exception calling callback", tr);
                    }
                } else {
                    ResponseHeaderCallback.this.onSuccess(new ResponseBuffers(this.replyHeader, result));
                }
            }
        }
    }
}

