/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.google.common.base.Throwables;
import com.ning.compress.lzf.LZFInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.PrecompactedRow;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.streaming.PendingFile;
import org.apache.cassandra.streaming.StreamHeader;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamInSession;
import org.apache.cassandra.streaming.StreamReply;
import org.apache.cassandra.streaming.compress.CompressedInputStream;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncomingStreamReader {
    private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
    protected final PendingFile localFile;
    protected final PendingFile remoteFile;
    protected final StreamInSession session;
    private final InputStream underliningStream;
    private final StreamingMetrics metrics;

    public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException {
        socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
        InetAddress host = ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
        if (header.pendingFiles.isEmpty() && header.file != null && !StreamInSession.hasSession(header.sessionId)) {
            StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
            OutboundTcpConnection.write(reply.createMessage(), header.sessionId.toString(), System.currentTimeMillis(), new DataOutputStream(socket.getOutputStream()), MessagingService.instance().getVersion(host));
            throw new IOException("Session " + header.sessionId + " already closed.");
        }
        this.session = StreamInSession.get(host, header.sessionId);
        this.session.setSocket(socket);
        this.session.addFiles(header.pendingFiles);
        this.session.setCurrentFile(header.file);
        this.session.setTable(header.table);
        this.remoteFile = header.file;
        PendingFile pendingFile = this.localFile = this.remoteFile != null ? StreamIn.getContextMapping(this.remoteFile) : null;
        this.underliningStream = this.remoteFile != null ? (this.remoteFile.compressionInfo == null ? new LZFInputStream(socket.getInputStream()) : new CompressedInputStream(socket.getInputStream(), this.remoteFile.compressionInfo)) : null;
        this.metrics = StreamingMetrics.get(host);
    }

    public void read() throws IOException {
        if (this.remoteFile != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Receiving stream");
                logger.debug("Creating file for {} with {} estimated keys", (Object)this.localFile.getFilename(), (Object)this.remoteFile.estimatedKeys);
            }
            assert (this.remoteFile.estimatedKeys > 0L);
            DataInputStream dis = new DataInputStream(this.underliningStream);
            try {
                SSTableWriter writer = this.streamIn(dis, this.localFile, this.remoteFile);
                this.session.finished(this.remoteFile, writer);
            }
            catch (IOException ex) {
                this.retry();
                throw ex;
            }
            catch (RuntimeException e) {
                this.session.close(false);
                throw e;
            }
        }
        this.session.closeIfFinished();
    }

    private SSTableWriter streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException {
        ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
        SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
        CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MIN_VALUE);
        try {
            BytesReadTracker in = new BytesReadTracker(input);
            long totalBytesRead = 0L;
            for (Pair<Long, Long> section : localFile.sections) {
                long length = (Long)section.right - (Long)section.left;
                if (remoteFile.compressionInfo != null) {
                    ((CompressedInputStream)this.underliningStream).position((Long)section.left);
                }
                long bytesRead = 0L;
                while (bytesRead < length) {
                    in.reset(0L);
                    DecoratedKey key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
                    long dataSize = SSTableReader.readRowSize(in, localFile.desc);
                    if (cfs.containsCachedRow(key) && remoteFile.type == OperationType.AES && dataSize <= (long)DatabaseDescriptor.getInMemoryCompactionLimit()) {
                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, localFile.getFilename(), key, 0L, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
                        PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
                        assert (!row.isEmpty());
                        writer.append(row);
                        ColumnFamily cf = row.getFullColumnFamily();
                        cfs.maybeUpdateRowCache(key, cf);
                    } else {
                        writer.appendFromStream(key, cfs.metadata, dataSize, in);
                        cfs.invalidateCachedRow(key);
                    }
                    bytesRead += in.getBytesRead();
                    remoteFile.progress = remoteFile.compressionInfo != null ? ((CompressedInputStream)this.underliningStream).getTotalCompressedBytesRead() : (remoteFile.progress += in.getBytesRead());
                    totalBytesRead += in.getBytesRead();
                }
            }
            StreamingMetrics.totalIncomingBytes.inc(totalBytesRead);
            this.metrics.incomingBytes.inc(totalBytesRead);
            SSTableWriter sSTableWriter = writer;
            return sSTableWriter;
        }
        catch (Throwable e) {
            writer.abort();
            if (e instanceof IOException) {
                throw (IOException)e;
            }
            throw Throwables.propagate((Throwable)e);
        }
        finally {
            controller.close();
        }
    }

    private void retry() {
        this.session.retry(this.remoteFile);
        if (new File(this.localFile.getFilename()).isFile()) {
            FileUtils.deleteWithConfirm(new File(this.localFile.getFilename()));
        }
    }
}

