/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.map;

import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.BitSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.map.AbstractChannelReplicator;
import net.openhft.chronicle.map.IdentifierListener;
import net.openhft.chronicle.map.Replica;
import net.openhft.chronicle.map.TcpReplicationConfig;
import net.openhft.chronicle.map.ThrottlingConfig;
import net.openhft.lang.io.ByteBufferBytes;
import net.openhft.lang.io.Bytes;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TcpReplicator
extends AbstractChannelReplicator
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((String)TcpReplicator.class.getName());
    private static final int BUFFER_SIZE = 0x100000;
    private final SelectionKey[] selectionKeysStore = new SelectionKey[128];
    private final KeyInterestUpdater opWriteUpdater = new KeyInterestUpdater(4, this.selectionKeysStore);
    private final BitSet activeKeys = new BitSet(this.selectionKeysStore.length);
    private final long heartBeatIntervalMillis;
    private final Replica replica;
    private final byte localIdentifier;
    private final int maxEntrySizeBytes;
    private final Replica.EntryExternalizable externalizable;
    private final TcpReplicationConfig replicationConfig;
    private long selectorTimeout;

    TcpReplicator(@NotNull Replica replica, @NotNull Replica.EntryExternalizable externalizable, @NotNull TcpReplicationConfig replicationConfig, int maxEntrySizeBytes) throws IOException {
        super("TcpSocketReplicator-" + replica.identifier(), replicationConfig.throttlingConfig(), maxEntrySizeBytes);
        ThrottlingConfig throttlingConfig = replicationConfig.throttlingConfig();
        long throttleBucketInterval = TimeUnit.MILLISECONDS.convert(throttlingConfig.bucketInterval(), throttlingConfig.bucketIntervalUnit());
        this.heartBeatIntervalMillis = TimeUnit.MILLISECONDS.convert(replicationConfig.heartBeatInterval(), replicationConfig.heartBeatIntervalUnit());
        this.selectorTimeout = Math.min(this.heartBeatIntervalMillis, throttleBucketInterval);
        this.replica = replica;
        this.localIdentifier = replica.identifier();
        this.maxEntrySizeBytes = maxEntrySizeBytes;
        this.externalizable = externalizable;
        this.replicationConfig = replicationConfig;
        this.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void process() throws IOException {
        try {
            InetSocketAddress serverInetSocketAddress = new InetSocketAddress(this.replicationConfig.serverPort());
            AbstractChannelReplicator.Details serverDetails = new AbstractChannelReplicator.Details(serverInetSocketAddress, this.localIdentifier);
            new ServerConnector(serverDetails).connect();
            for (InetSocketAddress client : this.replicationConfig.endpoints()) {
                AbstractChannelReplicator.Details clientDetails = new AbstractChannelReplicator.Details(client, this.localIdentifier);
                new ClientConnector(clientDetails).connect();
            }
            while (this.selector.isOpen()) {
                this.registerPendingRegistrations();
                int nSelectedKeys = this.selector.select(this.selectorTimeout);
                long approxTime = System.currentTimeMillis();
                this.checkThrottleInterval();
                this.heartBeatMonitor(approxTime);
                this.opWriteUpdater.applyUpdates();
                if (nSelectedKeys == 0) continue;
                Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
                for (SelectionKey key : selectionKeys) {
                    try {
                        if (!key.isValid()) continue;
                        if (key.isAcceptable()) {
                            this.onAccept(key);
                        }
                        if (key.isConnectable()) {
                            this.onConnect(key);
                        }
                        if (key.isReadable()) {
                            this.onRead(key, approxTime);
                        }
                        if (!key.isWritable()) continue;
                        this.onWrite(key, approxTime);
                    }
                    catch (CancelledKeyException e) {
                        if (this.isClosed) continue;
                        this.quietClose(key, e);
                    }
                    catch (ClosedSelectorException e) {
                        if (this.isClosed) continue;
                        this.quietClose(key, e);
                    }
                    catch (IOException e) {
                        if (this.isClosed) continue;
                        this.quietClose(key, e);
                    }
                    catch (InterruptedException e) {
                        if (this.isClosed) continue;
                        this.quietClose(key, e);
                    }
                    catch (Exception e) {
                        LOG.info("", (Throwable)e);
                        if (this.isClosed) continue;
                        this.closeEarlyAndQuietly(key.channel());
                    }
                }
                selectionKeys.clear();
            }
        }
        catch (CancelledKeyException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("", (Throwable)e);
            }
        }
        catch (ClosedSelectorException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("", (Throwable)e);
            }
        }
        catch (ClosedChannelException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("", (Throwable)e);
            }
        }
        catch (ConnectException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("", (Throwable)e);
            }
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
        }
        finally {
            if (!this.isClosed) {
                this.close();
            }
        }
    }

    void heartBeatMonitor(long approxTime) {
        int i = this.activeKeys.nextSetBit(0);
        while (i >= 0) {
            block9: {
                try {
                    SelectionKey key;
                    block10: {
                        key = this.selectionKeysStore[i];
                        if (!key.isValid() || !key.channel().isOpen()) {
                            this.activeKeys.clear(i);
                            break block9;
                        }
                        try {
                            this.sendHeartbeatIfRequired(approxTime, key);
                        }
                        catch (Exception e) {
                            if (!LOG.isDebugEnabled()) break block10;
                            LOG.debug("", (Throwable)e);
                        }
                    }
                    try {
                        this.heartbeatCheckHasReceived(key, approxTime);
                    }
                    catch (Exception e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("", (Throwable)e);
                        }
                    }
                }
                catch (Exception e) {
                    if (!LOG.isDebugEnabled()) break block9;
                    LOG.debug("", (Throwable)e);
                }
            }
            i = this.activeKeys.nextSetBit(i + 1);
        }
    }

    private void sendHeartbeatIfRequired(long approxTime, @NotNull SelectionKey key) {
        Attached attachment = (Attached)key.attachment();
        if (attachment.isHandShakingComplete() && attachment.entryWriter.lastSentTime + this.heartBeatIntervalMillis < approxTime) {
            attachment.entryWriter.lastSentTime = approxTime;
            attachment.entryWriter.writeHeartbeatToBuffer();
            this.enableOpWrite(key);
            if (LOG.isDebugEnabled()) {
                LOG.debug("sending heartbeat");
            }
        }
    }

    private void enableOpWrite(SelectionKey key) {
        int ops = key.interestOps();
        if ((ops & 0x18) == 0) {
            key.interestOps(ops | 4);
        }
    }

    private void heartbeatCheckHasReceived(@NotNull SelectionKey key, long approxTimeOutTime) throws ConnectException {
        Attached attached = (Attached)key.attachment();
        if (attached.isServer || !attached.isHandShakingComplete()) {
            return;
        }
        SocketChannel channel = (SocketChannel)key.channel();
        if (approxTimeOutTime > attached.entryReader.lastHeartBeatReceived + attached.remoteHeartbeatInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("lost connection, attempting to reconnect. missed heartbeat from identifier=" + attached.remoteIdentifier);
            }
            this.activeKeys.clear(attached.remoteIdentifier);
            this.closeables.closeQuietly(channel.socket());
            if (this.replicationConfig.autoReconnectedUponDroppedConnection()) {
                attached.connector.connectLater();
            }
        }
    }

    private void quietClose(@NotNull SelectionKey key, @NotNull Exception e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("", (Throwable)e);
        }
        this.closeEarlyAndQuietly(key.channel());
    }

    private void onConnect(@NotNull SelectionKey key) throws IOException, InterruptedException {
        SocketChannel channel = (SocketChannel)key.channel();
        Attached attached = (Attached)key.attachment();
        try {
            if (!channel.finishConnect()) {
                return;
            }
        }
        catch (SocketException e) {
            this.quietClose(key, e);
            if (this.replicationConfig.autoReconnectedUponDroppedConnection()) {
                attached.connector.connect();
            }
            throw e;
        }
        attached.connector.setSuccessfullyConnected();
        if (LOG.isDebugEnabled()) {
            LOG.debug("successfully connected to {}, local-id={}", (Object)channel.socket().getInetAddress(), (Object)this.localIdentifier);
        }
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(true);
        channel.socket().setSoTimeout(0);
        channel.socket().setSoLinger(false, 0);
        attached.entryReader = new TcpSocketChannelEntryReader();
        attached.entryWriter = new TcpSocketChannelEntryWriter();
        key.interestOps(5);
        this.throttle(channel);
        attached.entryWriter.identifierToBuffer(this.localIdentifier);
    }

    private void onAccept(@NotNull SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel)key.channel();
        SocketChannel channel = server.accept();
        channel.configureBlocking(false);
        channel.socket().setReuseAddress(true);
        channel.socket().setTcpNoDelay(true);
        channel.socket().setSoTimeout(0);
        channel.socket().setSoLinger(false, 0);
        Attached attached = new Attached();
        channel.register(this.selector, 5, attached);
        this.throttle(channel);
        attached.entryReader = new TcpSocketChannelEntryReader();
        attached.entryWriter = new TcpSocketChannelEntryWriter();
        attached.isServer = true;
        attached.entryWriter.identifierToBuffer(this.localIdentifier);
    }

    private void clearHandshaking(SelectionKey key) {
        Attached attached = (Attached)key.attachment();
        this.activeKeys.clear(attached.remoteIdentifier);
        this.selectionKeysStore[attached.remoteIdentifier] = null;
        attached.clearHandShaking();
    }

    private void doHandShaking(@NotNull SelectionKey key, SocketChannel socketChannel) throws IOException, InterruptedException {
        Attached attached = (Attached)key.attachment();
        TcpSocketChannelEntryWriter writer = attached.entryWriter;
        TcpSocketChannelEntryReader reader = attached.entryReader;
        if (attached.remoteIdentifier == -128) {
            byte remoteIdentifier = reader.identifierFromBuffer();
            if (remoteIdentifier == -128) {
                return;
            }
            attached.remoteIdentifier = remoteIdentifier;
            this.selectionKeysStore[remoteIdentifier] = key;
            this.activeKeys.set(remoteIdentifier);
            if (LOG.isDebugEnabled()) {
                LOG.debug("server-connection id={}, remoteIdentifier={}", (Object)this.localIdentifier, (Object)remoteIdentifier);
            }
            IdentifierListener identifierListener = this.replicationConfig.identifierListener;
            SocketAddress remoteAddress = socketChannel.getRemoteAddress();
            if (identifierListener != null && !identifierListener.isIdentifierUnique(remoteIdentifier, remoteAddress) || remoteIdentifier == this.localIdentifier) {
                throw new IllegalStateException("dropping connection, as the remote-identifier is already being used, identifier=" + remoteIdentifier);
            }
            attached.remoteModificationIterator = this.replica.acquireModificationIterator(remoteIdentifier, attached);
            writer.writeRemoteBootstrapTimestamp(this.replica.lastModificationTime(remoteIdentifier));
            writer.writeRemoteHeartbeatInterval(this.heartBeatIntervalMillis);
        }
        if (attached.remoteBootstrapTimestamp == Long.MIN_VALUE) {
            attached.remoteBootstrapTimestamp = reader.remoteBootstrapTimestamp();
            if (attached.remoteBootstrapTimestamp == Long.MIN_VALUE) {
                return;
            }
        }
        if (!attached.hasRemoteHeartbeatInterval) {
            long value = reader.remoteHeartbeatIntervalFromBuffer();
            if (value == Long.MIN_VALUE) {
                return;
            }
            if (value < 0L) {
                LOG.error("value=" + value);
            }
            attached.remoteHeartbeatInterval = (long)((double)value * 1.25);
            this.selectorTimeout = Math.min(this.selectorTimeout, value);
            if (this.selectorTimeout < 0L) {
                LOG.info("");
            }
            attached.hasRemoteHeartbeatInterval = true;
            attached.handShakingComplete = true;
            attached.remoteModificationIterator.dirtyEntries(attached.remoteBootstrapTimestamp);
            reader.entriesFromBuffer();
        }
    }

    private void onWrite(@NotNull SelectionKey key, long approxTime) throws InterruptedException, IOException {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        Attached attached = (Attached)key.attachment();
        if (attached.remoteModificationIterator != null) {
            attached.entryWriter.entriesToBuffer(attached.remoteModificationIterator, key);
        }
        try {
            int bytesJustWritten = attached.entryWriter.writeBufferToSocket(socketChannel, approxTime);
            this.contemplateThrottleWrites(bytesJustWritten);
        }
        catch (IOException e) {
            this.quietClose(key, e);
            if (!attached.isServer) {
                attached.connector.connectLater();
            }
            throw e;
        }
    }

    private void onRead(SelectionKey key, long approxTime) throws IOException, InterruptedException {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        Attached attached = (Attached)key.attachment();
        try {
            if (attached.entryReader.readSocketToBuffer(socketChannel) <= 0) {
                return;
            }
        }
        catch (IOException e) {
            if (!attached.isServer) {
                attached.connector.connectLater();
            }
            throw e;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("heartbeat or data received.");
        }
        attached.entryReader.lastHeartBeatReceived = approxTime;
        if (attached.isHandShakingComplete()) {
            attached.entryReader.entriesFromBuffer();
        } else {
            this.doHandShaking(key, socketChannel);
        }
    }

    private class TcpSocketChannelEntryReader {
        private final ByteBuffer in;
        private final ByteBufferBytes out;
        public long lastHeartBeatReceived = System.currentTimeMillis();
        private int sizeOfNextEntry = Integer.MIN_VALUE;

        private TcpSocketChannelEntryReader() {
            this.in = ByteBuffer.allocateDirect(TcpReplicator.this.replicationConfig.packetSize() + TcpReplicator.this.maxEntrySizeBytes);
            this.out = new ByteBufferBytes(this.in);
            this.out.limit(0L);
            this.in.clear();
        }

        private int readSocketToBuffer(@NotNull SocketChannel socketChannel) throws IOException {
            this.compactBuffer();
            int len = socketChannel.read(this.in);
            this.out.limit((long)this.in.position());
            return len;
        }

        private void entriesFromBuffer() throws InterruptedException, IOException {
            while (true) {
                this.out.limit((long)this.in.position());
                if (this.sizeOfNextEntry == Integer.MIN_VALUE) {
                    if (this.out.remaining() < 2L) {
                        return;
                    }
                    int value = this.out.readUnsignedShort();
                    if (value == 0) continue;
                    this.sizeOfNextEntry = value;
                }
                if (this.out.remaining() < (long)this.sizeOfNextEntry) {
                    return;
                }
                long nextEntryPos = this.out.position() + (long)this.sizeOfNextEntry;
                long limit = this.out.limit();
                this.out.limit(nextEntryPos);
                TcpReplicator.this.externalizable.readExternalEntry((Bytes)this.out);
                this.out.limit(limit);
                this.out.position(nextEntryPos);
                this.sizeOfNextEntry = Integer.MIN_VALUE;
            }
        }

        private void compactBuffer() {
            if (this.in.position() == 0 || this.in.remaining() > TcpReplicator.this.maxEntrySizeBytes) {
                return;
            }
            this.in.limit(this.in.position());
            this.in.position((int)this.out.position());
            this.in.compact();
            this.out.position(0L);
        }

        byte identifierFromBuffer() {
            return this.out.remaining() >= 1L ? this.out.readByte() : (byte)-128;
        }

        long remoteBootstrapTimestamp() {
            return this.out.remaining() >= 8L ? this.out.readLong() : Long.MIN_VALUE;
        }

        public long remoteHeartbeatIntervalFromBuffer() {
            return this.out.remaining() >= 8L ? this.out.readLong() : Long.MIN_VALUE;
        }
    }

    private class TcpSocketChannelEntryWriter {
        private final ByteBuffer out;
        private final ByteBufferBytes in;
        private final AbstractChannelReplicator.EntryCallback entryCallback;
        private long lastSentTime;

        private TcpSocketChannelEntryWriter() {
            this.out = ByteBuffer.allocateDirect(TcpReplicator.this.replicationConfig.packetSize() + TcpReplicator.this.maxEntrySizeBytes);
            this.in = new ByteBufferBytes(this.out);
            this.entryCallback = new AbstractChannelReplicator.EntryCallback(TcpReplicator.this.externalizable, this.in);
        }

        void identifierToBuffer(int localIdentifier) {
            this.in.writeByte(localIdentifier);
        }

        void writeRemoteBootstrapTimestamp(long timeStampOfLastMessage) {
            this.in.writeLong(timeStampOfLastMessage);
        }

        void entriesToBuffer(@NotNull Replica.ModificationIterator modificationIterator, @NotNull SelectionKey selectionKey) throws InterruptedException, IOException {
            SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
            Attached attached = (Attached)selectionKey.attachment();
            boolean handShakingComplete = attached.isHandShakingComplete();
            do {
                boolean wasDataRead;
                if (wasDataRead = modificationIterator.nextEntry(this.entryCallback, 0)) continue;
                if (this.in.position() == 0L && handShakingComplete) {
                    this.disableWrite(socketChannel, attached);
                }
                return;
            } while (this.in.remaining() > (long)TcpReplicator.this.maxEntrySizeBytes);
        }

        private int writeBufferToSocket(@NotNull SocketChannel socketChannel, long approxTime) throws IOException {
            if (this.in.position() == 0L) {
                return 0;
            }
            this.lastSentTime = approxTime;
            this.out.limit((int)this.in.position());
            int len = socketChannel.write(this.out);
            if (LOG.isDebugEnabled()) {
                LOG.debug("bytes-written=" + len);
            }
            if (this.out.remaining() == 0) {
                this.out.clear();
                this.in.clear();
            } else {
                this.out.compact();
                this.in.position((long)this.out.position());
                this.in.limit(this.in.capacity());
                this.out.clear();
            }
            return len;
        }

        private void writeHeartbeatToBuffer() {
            this.in.writeUnsignedShort(0);
        }

        private void writeRemoteHeartbeatInterval(long localHeartbeatInterval) {
            this.in.writeLong(localHeartbeatInterval);
        }

        public synchronized void disableWrite(@NotNull SocketChannel socketChannel, @NotNull Attached attached) {
            try {
                SelectionKey key = socketChannel.keyFor(TcpReplicator.this.selector);
                if (key != null && attached.isHandShakingComplete() && TcpReplicator.this.selector.isOpen()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Disabling OP_WRITE to remoteIdentifier=" + attached.remoteIdentifier + ", localIdentifier=" + TcpReplicator.this.localIdentifier);
                    }
                    key.interestOps(key.interestOps() & 0xFFFFFFFB);
                }
            }
            catch (Exception e) {
                LOG.error("", (Throwable)e);
            }
        }
    }

    class Attached
    implements Replica.ModificationNotifier {
        public TcpSocketChannelEntryReader entryReader;
        public TcpSocketChannelEntryWriter entryWriter;
        public Replica.ModificationIterator remoteModificationIterator;
        public AbstractChannelReplicator.AbstractConnector connector;
        public long remoteBootstrapTimestamp = Long.MIN_VALUE;
        public byte remoteIdentifier = (byte)-128;
        public boolean hasRemoteHeartbeatInterval;
        public boolean isServer;
        public long remoteHeartbeatInterval = TcpReplicator.access$1200(TcpReplicator.this);
        public boolean handShakingComplete;

        Attached() {
        }

        boolean isHandShakingComplete() {
            return this.handShakingComplete;
        }

        void clearHandShaking() {
            this.handShakingComplete = false;
            this.remoteIdentifier = (byte)-128;
            this.remoteBootstrapTimestamp = Long.MIN_VALUE;
            this.remoteHeartbeatInterval = TcpReplicator.this.heartBeatIntervalMillis;
            this.hasRemoteHeartbeatInterval = false;
            this.remoteModificationIterator = null;
        }

        @Override
        public void onChange() {
            if (this.remoteIdentifier != -128) {
                TcpReplicator.this.opWriteUpdater.set(this.remoteIdentifier);
            }
        }
    }

    private class ClientConnector
    extends AbstractChannelReplicator.AbstractConnector {
        private final AbstractChannelReplicator.Details details;

        private ClientConnector(AbstractChannelReplicator.Details details) {
            super("TCP-ClientConnector-" + details.localIdentifier());
            this.details = details;
        }

        public String toString() {
            return "ClientConnector{" + this.details + '}';
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        SelectableChannel doConnect() throws IOException, InterruptedException {
            boolean success = false;
            final SocketChannel socketChannel = SocketChannel.open();
            try {
                socketChannel.configureBlocking(false);
                socketChannel.socket().setReuseAddress(true);
                socketChannel.socket().setSoLinger(false, 0);
                socketChannel.socket().setSoTimeout(0);
                socketChannel.socket().setTcpNoDelay(true);
                try {
                    socketChannel.connect(this.details.address());
                }
                catch (UnresolvedAddressException e) {
                    this.connectLater();
                }
                Thread.sleep(10L);
                TcpReplicator.this.addPendingRegistration(new Runnable(){

                    @Override
                    public void run() {
                        block2: {
                            Attached attached = new Attached();
                            attached.connector = ClientConnector.this;
                            try {
                                socketChannel.register(TcpReplicator.this.selector, 8, attached);
                            }
                            catch (ClosedChannelException e) {
                                if (!socketChannel.isOpen()) break block2;
                                LOG.error("", (Throwable)e);
                            }
                        }
                    }
                });
                TcpReplicator.this.selector.wakeup();
                success = true;
                SocketChannel socketChannel2 = socketChannel;
                return socketChannel2;
            }
            finally {
                if (!success) {
                    try {
                        try {
                            socketChannel.socket().close();
                        }
                        catch (Exception e) {
                            LOG.error("", (Throwable)e);
                        }
                        socketChannel.close();
                    }
                    catch (IOException e) {
                        LOG.error("", (Throwable)e);
                    }
                }
            }
        }
    }

    private class ServerConnector
    extends AbstractChannelReplicator.AbstractConnector {
        private final AbstractChannelReplicator.Details details;

        private ServerConnector(AbstractChannelReplicator.Details details) {
            super("TCP-ServerConnector-" + TcpReplicator.this.localIdentifier);
            this.details = details;
        }

        public String toString() {
            return "ServerConnector{" + this.details + '}';
        }

        @Override
        SelectableChannel doConnect() throws IOException, InterruptedException {
            final ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.socket().setReceiveBufferSize(0x100000);
            serverChannel.configureBlocking(false);
            ServerSocket serverSocket = serverChannel.socket();
            serverSocket.setReuseAddress(true);
            serverSocket.bind(this.details.address());
            TcpReplicator.this.addPendingRegistration(new Runnable(){

                @Override
                public void run() {
                    Attached attached = new Attached();
                    attached.connector = ServerConnector.this;
                    try {
                        serverChannel.register(TcpReplicator.this.selector, 16, attached);
                    }
                    catch (ClosedChannelException e) {
                        LOG.error("", (Throwable)e);
                    }
                }
            });
            TcpReplicator.this.selector.wakeup();
            return serverChannel;
        }
    }

    private static class KeyInterestUpdater {
        private final AtomicBoolean wasChanged = new AtomicBoolean();
        private final BitSet changeOfOpWriteRequired;
        private final SelectionKey[] selectionKeys;
        private final int op;

        KeyInterestUpdater(int op, SelectionKey[] selectionKeys) {
            this.op = op;
            this.selectionKeys = selectionKeys;
            this.changeOfOpWriteRequired = new BitSet(selectionKeys.length);
        }

        public void applyUpdates() {
            if (this.wasChanged.getAndSet(false)) {
                int i = this.changeOfOpWriteRequired.nextSetBit(0);
                while (i >= 0) {
                    this.changeOfOpWriteRequired.clear(i);
                    SelectionKey key = this.selectionKeys[i];
                    try {
                        key.interestOps(key.interestOps() | this.op);
                    }
                    catch (Exception e) {
                        LOG.debug("", (Throwable)e);
                    }
                    i = this.changeOfOpWriteRequired.nextSetBit(i + 1);
                }
            }
        }

        public void set(int keyIndex) {
            this.changeOfOpWriteRequired.set(keyIndex);
            this.wasChanged.lazySet(true);
        }
    }
}

