/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.streaming.AbstractStreamSession;
import org.apache.cassandra.streaming.IStreamCallback;
import org.apache.cassandra.streaming.PendingFile;
import org.apache.cassandra.streaming.StreamReply;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamInSession
extends AbstractStreamSession {
    private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
    private static final ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap();
    private final Set<PendingFile> files = new NonBlockingHashSet();
    private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
    private PendingFile current;
    private Socket socket;
    private volatile int retries;
    private static final AtomicInteger sessionIdCounter = new AtomicInteger(0);

    private static long nextSessionId() {
        return 0L + (long)sessionIdCounter.incrementAndGet();
    }

    private StreamInSession(Pair<InetAddress, Long> context, IStreamCallback callback) {
        super(null, context, callback);
    }

    public static StreamInSession create(InetAddress host, IStreamCallback callback) {
        Pair<InetAddress, Long> context = Pair.create(host, StreamInSession.nextSessionId());
        StreamInSession session = new StreamInSession(context, callback);
        sessions.put(context, session);
        return session;
    }

    public static StreamInSession get(InetAddress host, long sessionId) {
        StreamInSession possibleNew;
        Pair<InetAddress, Long> context = Pair.create(host, sessionId);
        StreamInSession session = (StreamInSession)sessions.get(context);
        if (session == null && (session = sessions.putIfAbsent(context, possibleNew = new StreamInSession(context, null))) == null) {
            session = possibleNew;
        }
        return session;
    }

    public void setCurrentFile(PendingFile file) {
        this.current = file;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public void setSocket(Socket socket) {
        this.socket = socket;
    }

    public void addFiles(Collection<PendingFile> files) {
        for (PendingFile file : files) {
            if (logger.isDebugEnabled()) {
                logger.debug("Adding file {} to Stream Request queue", (Object)file.getFilename());
            }
            this.files.add(file);
        }
    }

    public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug("Finished {} (from {}). Sending ack to {}", new Object[]{remoteFile, this.getHost(), this});
        }
        assert (reader != null);
        this.readers.add(reader);
        this.files.remove(remoteFile);
        if (remoteFile.equals(this.current)) {
            this.current = null;
        }
        StreamReply reply = new StreamReply(remoteFile.getFilename(), this.getSessionId(), StreamReply.Status.FILE_FINISHED);
        this.sendMessage(reply.createMessage());
        logger.debug("ack {} sent for {}", (Object)reply, (Object)remoteFile);
    }

    public void retry(PendingFile remoteFile) {
        ++this.retries;
        if (this.retries > DatabaseDescriptor.getMaxStreamingRetries()) {
            logger.error(String.format("Failed streaming session %d from %s while receiving %s", this.getSessionId(), this.getHost().toString(), this.current), (Throwable)new IllegalStateException("Too many retries for " + remoteFile));
            this.close(false);
            return;
        }
        StreamReply reply = new StreamReply(remoteFile.getFilename(), this.getSessionId(), StreamReply.Status.FILE_RETRY);
        logger.info("Streaming of file {} for {} failed: requesting a retry.", (Object)remoteFile, (Object)this);
        try {
            this.sendMessage(reply.createMessage());
        }
        catch (IOException e) {
            logger.error("Sending retry message failed, closing session.", (Throwable)e);
            this.close(false);
        }
    }

    public void sendMessage(MessageOut<StreamReply> message) throws IOException {
        DataOutputStream out = new DataOutputStream(this.socket.getOutputStream());
        OutboundTcpConnection.write(message, String.valueOf(this.getSessionId()), System.currentTimeMillis(), out, MessagingService.instance().getVersion(this.getHost()));
        out.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closeIfFinished() throws IOException {
        if (this.files.isEmpty()) {
            HashMap cfstores = new HashMap();
            try {
                for (SSTableReader sSTableReader : this.readers) {
                    assert (sSTableReader.getTableName().equals(this.table));
                    if (!sSTableReader.acquireReference()) {
                        throw new AssertionError((Object)"We shouldn't fail acquiring a reference on a sstable that has just been transferred");
                    }
                    ColumnFamilyStore cfs = Table.open(sSTableReader.getTableName()).getColumnFamilyStore(sSTableReader.getColumnFamilyName());
                    if (!cfstores.containsKey(cfs)) {
                        cfstores.put(cfs, new ArrayList());
                    }
                    ((List)cfstores.get(cfs)).add(sSTableReader);
                }
                for (Map.Entry entry : cfstores.entrySet()) {
                    if (entry.getKey() == null) continue;
                    ((ColumnFamilyStore)entry.getKey()).addSSTables((Collection)entry.getValue());
                    ((ColumnFamilyStore)entry.getKey()).indexManager.maybeBuildSecondaryIndexes((Collection)entry.getValue(), ((ColumnFamilyStore)entry.getKey()).indexManager.allIndexesNames());
                }
            }
            finally {
                for (List list : cfstores.values()) {
                    SSTableReader.releaseReferences(list);
                }
            }
            StreamReply reply = new StreamReply("", this.getSessionId(), StreamReply.Status.SESSION_FINISHED);
            logger.info("Finished streaming session {} from {}", (Object)this.getSessionId(), (Object)this.getHost());
            try {
                if (this.socket != null) {
                    OutboundTcpConnection.write(reply.createMessage(), ((Long)this.context.right).toString(), System.currentTimeMillis(), new DataOutputStream(this.socket.getOutputStream()), MessagingService.instance().getVersion(this.getHost()));
                } else {
                    logger.debug("No socket to reply to {} with!", (Object)this.getHost());
                }
            }
            finally {
                if (this.socket != null) {
                    this.socket.close();
                }
            }
            this.close(true);
        }
    }

    @Override
    protected void closeInternal(boolean success) {
        sessions.remove(this.context);
        if (!success && FailureDetector.instance.isAlive(this.getHost())) {
            StreamReply reply = new StreamReply("", this.getSessionId(), StreamReply.Status.SESSION_FAILURE);
            MessagingService.instance().sendOneWay(reply.createMessage(), this.getHost());
        }
    }

    public static Set<InetAddress> getSources() {
        HashSet<InetAddress> set = new HashSet<InetAddress>();
        for (StreamInSession session : sessions.values()) {
            set.add(session.getHost());
        }
        return set;
    }

    public static Set<PendingFile> getIncomingFiles(InetAddress host) {
        HashSet<PendingFile> set = new HashSet<PendingFile>();
        for (Map.Entry entry : sessions.entrySet()) {
            if (!((InetAddress)((Pair)entry.getKey()).left).equals(host)) continue;
            StreamInSession session = (StreamInSession)entry.getValue();
            if (session.current != null) {
                set.add(session.current);
            }
            set.addAll(session.files);
        }
        return set;
    }
}

