/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.Socket;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.IncomingStreamReader;
import org.apache.cassandra.streaming.StreamHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.SnappyInputStream;

public class IncomingTcpConnection
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
    private final Socket socket;
    public InetAddress from;

    public IncomingTcpConnection(Socket socket) {
        assert (socket != null);
        this.socket = socket;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            DataInputStream in = new DataInputStream(this.socket.getInputStream());
            MessagingService.validateMagic(in.readInt());
            int header = in.readInt();
            boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
            int version = MessagingService.getBits(header, 15, 8);
            logger.debug("Connection version {} from {}", (Object)version, (Object)this.socket.getInetAddress());
            if (isStream) {
                this.handleStream(in, version);
            } else if (version < 6) {
                this.handleLegacyVersion(version);
            } else {
                this.handleModernVersion(version, header);
            }
        }
        catch (EOFException e) {
            logger.trace("eof reading from socket; closing", (Throwable)e);
        }
        catch (IOException e) {
            logger.debug("IOException reading from socket; closing", (Throwable)e);
        }
        finally {
            this.close();
        }
    }

    private void handleModernVersion(int version, int header) throws IOException {
        boolean compressed;
        DataOutputStream out = new DataOutputStream(this.socket.getOutputStream());
        out.writeInt(6);
        out.flush();
        DataInputStream in = new DataInputStream(this.socket.getInputStream());
        int maxVersion = in.readInt();
        this.from = CompactEndpointSerializationHelper.deserialize(in);
        boolean bl = compressed = MessagingService.getBits(header, 2, 1) == 1;
        if (compressed) {
            logger.debug("Upgrading incoming connection to be compressed");
            in = new DataInputStream((InputStream)new SnappyInputStream(this.socket.getInputStream()));
        } else {
            in = new DataInputStream(new BufferedInputStream(this.socket.getInputStream(), 4096));
        }
        logger.debug("Max version for {} is {}", (Object)this.from, (Object)maxVersion);
        if (version > 6) {
            Gossiper.instance.addSavedEndpoint(this.from);
            logger.info("Received messages from newer protocol version {}. Ignoring", (Object)version);
            return;
        }
        MessagingService.instance().setVersion(this.from, Math.min(6, maxVersion));
        logger.debug("set version for {} to {}", (Object)this.from, (Object)Math.min(6, maxVersion));
        while (true) {
            MessagingService.validateMagic(in.readInt());
            this.receiveMessage(in, version);
        }
    }

    private void handleLegacyVersion(int version) throws IOException {
        DataInputStream in = new DataInputStream(new BufferedInputStream(this.socket.getInputStream(), 4096));
        this.from = this.receiveMessage(in, version);
        logger.debug("Version for {} is {}", (Object)this.from, (Object)version);
        if (version > 6) {
            Gossiper.instance.addSavedEndpoint(this.from);
            logger.info("Received messages from newer protocol version. Ignoring");
            return;
        }
        int lastVersion = MessagingService.instance().setVersion(this.from, version);
        logger.debug("set version for {} to {}", (Object)this.from, (Object)version);
        if (lastVersion < version) {
            logger.debug("breaking outbound connections to force version upgrade");
            MessagingService.instance().getConnectionPool(this.from).resetToNewerVersion(version);
        }
        while (true) {
            MessagingService.validateMagic(in.readInt());
            int header = in.readInt();
            assert (MessagingService.getBits(header, 3, 1) != 1) : "Non-stream connection cannot change to stream";
            version = MessagingService.getBits(header, 15, 8);
            logger.trace("Version is now {}", (Object)version);
            this.receiveMessage(in, version);
        }
    }

    private void handleStream(DataInputStream input, int version) throws IOException {
        if (version == 6) {
            int size = input.readInt();
            byte[] headerBytes = new byte[size];
            input.readFully(headerBytes);
            this.stream(StreamHeader.serializer.deserialize(new DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input);
        } else {
            logger.error("Received stream using protocol version {} (my version {}). Terminating connection", (Object)version, (Object)6);
        }
    }

    private InetAddress receiveMessage(DataInputStream input, int version) throws IOException {
        MessageIn message;
        if (version < 6) {
            input.readInt();
        }
        String id = input.readUTF();
        long timestamp = System.currentTimeMillis();
        if (version >= 6) {
            int partial = input.readInt();
            if (DatabaseDescriptor.hasCrossNodeTimeout()) {
                timestamp = timestamp & 0xFFFFFFFF00000000L | ((long)partial & 0xFFFFFFFFL) << 2 >> 2;
            }
        }
        if ((message = MessageIn.read(input, version, id)) == null) {
            return null;
        }
        if (version <= 6) {
            MessagingService.instance().receive(message, id, timestamp);
        } else {
            logger.debug("Received connection from newer protocol version {}. Ignoring message", (Object)version);
        }
        return message.from;
    }

    private void close() {
        block3: {
            if (this.from != null) {
                MessagingService.instance().resetVersion(this.from);
            }
            try {
                this.socket.close();
            }
            catch (IOException e) {
                if (!logger.isDebugEnabled()) break block3;
                logger.debug("error closing socket", (Throwable)e);
            }
        }
    }

    private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException {
        new IncomingStreamReader(streamHeader, this.socket).read();
    }
}

