/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.io.EOFException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.OutgoingStream;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamConnectionFactory;
import org.apache.cassandra.streaming.StreamHook;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamRequest;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSummary;
import org.apache.cassandra.streaming.StreamTask;
import org.apache.cassandra.streaming.StreamTransferTask;
import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.PrepareAckMessage;
import org.apache.cassandra.streaming.messages.PrepareSynAckMessage;
import org.apache.cassandra.streaming.messages.PrepareSynMessage;
import org.apache.cassandra.streaming.messages.ReceivedMessage;
import org.apache.cassandra.streaming.messages.SessionFailedMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamSession
implements IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
    public static volatile MessageStateSink sink = MessageStateSink.NONE;
    private final StreamOperation streamOperation;
    public final InetAddressAndPort peer;
    private final OutboundConnectionSettings template;
    private final int index;
    private StreamResultFuture streamResult;
    protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
    @VisibleForTesting
    protected final ConcurrentHashMap<TableId, StreamTransferTask> transfers = new ConcurrentHashMap();
    private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap<TableId, StreamReceiveTask>();
    private final StreamingMetrics metrics;
    final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<String, Set<Range<Token>>>();
    private final boolean isFollower;
    private final NettyStreamingMessageSender messageSender;
    private final ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<ChannelId, Channel>();
    private boolean maybeCompleted = false;
    private Future<?> closeFuture;
    private final UUID pendingRepair;
    private final PreviewKind previewKind;
    private volatile State state = State.INITIALIZED;

    public StreamSession(StreamOperation streamOperation, InetAddressAndPort peer, StreamConnectionFactory factory, boolean isFollower, int index, UUID pendingRepair, PreviewKind previewKind) {
        this.streamOperation = streamOperation;
        this.peer = peer;
        this.template = new OutboundConnectionSettings(peer);
        this.isFollower = isFollower;
        this.index = index;
        this.messageSender = new NettyStreamingMessageSender(this, this.template, factory, 12, previewKind.isPreview());
        this.metrics = StreamingMetrics.get(peer);
        this.pendingRepair = pendingRepair;
        this.previewKind = previewKind;
        logger.debug("Creating stream session to {} as {}", (Object)this.template, (Object)(isFollower ? "follower" : "initiator"));
    }

    public boolean isFollower() {
        return this.isFollower;
    }

    public UUID planId() {
        return this.streamResult == null ? null : this.streamResult.planId;
    }

    public int sessionIndex() {
        return this.index;
    }

    public StreamOperation streamOperation() {
        return this.streamResult == null ? null : this.streamResult.streamOperation;
    }

    public StreamOperation getStreamOperation() {
        return this.streamOperation;
    }

    public UUID getPendingRepair() {
        return this.pendingRepair;
    }

    public boolean isPreview() {
        return this.previewKind.isPreview();
    }

    public PreviewKind getPreviewKind() {
        return this.previewKind;
    }

    public StreamReceiver getAggregator(TableId tableId) {
        assert (this.receivers.containsKey(tableId)) : "Missing tableId " + tableId;
        return this.receivers.get(tableId).getReceiver();
    }

    public void init(StreamResultFuture streamResult) {
        this.streamResult = streamResult;
        StreamHook.instance.reportStreamFuture(this, streamResult);
    }

    public synchronized boolean attachInbound(Channel channel, boolean isControlChannel) {
        this.failIfFinished();
        if (!this.messageSender.hasControlChannel() && isControlChannel) {
            this.messageSender.injectControlMessageChannel(channel);
        }
        channel.closeFuture().addListener(ignored -> this.onChannelClose(channel));
        return this.channels.putIfAbsent(channel.id(), channel) == null;
    }

    public synchronized boolean attachOutbound(Channel channel) {
        this.failIfFinished();
        channel.closeFuture().addListener(ignored -> this.onChannelClose(channel));
        return this.channels.putIfAbsent(channel.id(), channel) == null;
    }

    private void onChannelClose(Channel channel) {
        if (this.channels.remove(channel.id()) != null && this.channels.isEmpty()) {
            this.messageSender.close();
        }
    }

    public void start() {
        if (this.requests.isEmpty() && this.transfers.isEmpty()) {
            logger.info("[Stream #{}] Session does not have any tasks.", (Object)this.planId());
            this.closeSession(State.COMPLETE);
            return;
        }
        try {
            logger.info("[Stream #{}] Starting streaming to {}{}", new Object[]{this.planId(), this.peer, this.template.connectTo == null ? "" : " through " + this.template.connectTo});
            this.messageSender.initialize();
            this.onInitializationComplete();
        }
        catch (Exception e) {
            JVMStabilityInspector.inspectThrowable(e);
            this.onError(e);
        }
    }

    public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies) {
        assert (Iterables.all((Iterable)fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges)) : fullRanges.toString();
        assert (Iterables.all((Iterable)transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges)) : transientRanges.toString();
        this.requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
    }

    synchronized void addTransferRanges(String keyspace, RangesAtEndpoint replicas, Collection<String> columnFamilies, boolean flushTables) {
        this.failIfFinished();
        Collection<ColumnFamilyStore> stores = this.getColumnFamilyStores(keyspace, columnFamilies);
        if (flushTables) {
            this.flushSSTables(stores);
        }
        RangesAtEndpoint unwrappedRanges = replicas.unwrap();
        List<OutgoingStream> streams = this.getOutgoingStreamsForRanges(unwrappedRanges, stores, this.pendingRepair, this.previewKind);
        this.addTransferStreams(streams);
        Set<Range<Token>> toBeUpdated = this.transferredRangesPerKeyspace.get(keyspace);
        if (toBeUpdated == null) {
            toBeUpdated = new HashSet<Range<Token>>();
        }
        toBeUpdated.addAll(replicas.ranges());
        this.transferredRangesPerKeyspace.put(keyspace, toBeUpdated);
    }

    private void failIfFinished() {
        if (this.state().isFinalState()) {
            throw new RuntimeException(String.format("Stream %s is finished with state %s", this.planId(), this.state().name()));
        }
    }

    private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies) {
        HashSet<ColumnFamilyStore> stores = new HashSet<ColumnFamilyStore>();
        if (columnFamilies.isEmpty()) {
            stores.addAll(Keyspace.open(keyspace).getColumnFamilyStores());
        } else {
            for (String cf : columnFamilies) {
                stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf));
            }
        }
        return stores;
    }

    @VisibleForTesting
    public List<OutgoingStream> getOutgoingStreamsForRanges(RangesAtEndpoint replicas, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind) {
        ArrayList<OutgoingStream> streams = new ArrayList<OutgoingStream>();
        try {
            for (ColumnFamilyStore cfs : stores) {
                streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, replicas, pendingRepair, previewKind));
            }
        }
        catch (Throwable t) {
            streams.forEach(OutgoingStream::finish);
            throw t;
        }
        return streams;
    }

    synchronized void addTransferStreams(Collection<OutgoingStream> streams) {
        this.failIfFinished();
        for (OutgoingStream stream : streams) {
            StreamTransferTask newTask;
            TableId tableId = stream.getTableId();
            StreamTransferTask task = this.transfers.get(tableId);
            if (task == null && (task = this.transfers.putIfAbsent(tableId, newTask = new StreamTransferTask(this, tableId))) == null) {
                task = newTask;
            }
            task.addTransferStream(stream);
        }
    }

    private synchronized Future<?> closeSession(State finalState) {
        if (this.closeFuture != null) {
            return this.closeFuture;
        }
        this.state(finalState);
        ArrayList futures = new ArrayList();
        if (finalState == State.FAILED || finalState == State.ABORTED) {
            futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
        }
        if (!this.isFollower || this.state != State.COMPLETE) {
            logger.debug("[Stream #{}] Will close attached channels {}", (Object)this.planId(), this.channels);
            this.channels.values().forEach(channel -> futures.add((Future<?>)channel.close()));
        }
        sink.onClose(this.peer);
        this.streamResult.handleSessionComplete(this);
        this.closeFuture = FBUtilities.allOf(futures);
        return this.closeFuture;
    }

    private void abortTasks() {
        try {
            this.receivers.values().forEach(StreamReceiveTask::abort);
            this.transfers.values().forEach(StreamTransferTask::abort);
        }
        catch (Exception e) {
            logger.warn("[Stream #{}] failed to abort some streaming tasks", (Object)this.planId(), (Object)e);
        }
    }

    public void state(State newState) {
        if (logger.isTraceEnabled()) {
            logger.trace("[Stream #{}] Changing session state from {} to {}", new Object[]{this.planId(), this.state, newState});
        }
        sink.recordState(this.peer, newState);
        this.state = newState;
    }

    public State state() {
        return this.state;
    }

    public NettyStreamingMessageSender getMessageSender() {
        return this.messageSender;
    }

    public boolean isSuccess() {
        return this.state == State.COMPLETE;
    }

    public synchronized void messageReceived(StreamMessage message) {
        if (message.type != StreamMessage.Type.KEEP_ALIVE) {
            this.failIfFinished();
        }
        sink.recordMessage(this.peer, message.type);
        switch (message.type) {
            case STREAM_INIT: {
                break;
            }
            case PREPARE_SYN: {
                PrepareSynMessage msg = (PrepareSynMessage)message;
                this.prepare(msg.requests, msg.summaries);
                break;
            }
            case PREPARE_SYNACK: {
                this.prepareSynAck((PrepareSynAckMessage)message);
                break;
            }
            case PREPARE_ACK: {
                this.prepareAck((PrepareAckMessage)message);
                break;
            }
            case STREAM: {
                this.receive((IncomingStreamMessage)message);
                break;
            }
            case RECEIVED: {
                ReceivedMessage received = (ReceivedMessage)message;
                this.received(received.tableId, received.sequenceNumber);
                break;
            }
            case COMPLETE: {
                this.complete();
                break;
            }
            case KEEP_ALIVE: {
                break;
            }
            case SESSION_FAILED: {
                this.sessionFailed();
                break;
            }
            default: {
                throw new AssertionError((Object)("unhandled StreamMessage type: " + message.getClass().getName()));
            }
        }
    }

    public void onInitializationComplete() {
        this.state(State.PREPARING);
        PrepareSynMessage prepare = new PrepareSynMessage();
        prepare.requests.addAll(this.requests);
        for (StreamTransferTask task : this.transfers.values()) {
            prepare.summaries.add(task.getSummary());
        }
        this.messageSender.sendMessage(prepare);
    }

    public synchronized Future<?> onError(Throwable e) {
        boolean isEofException = e instanceof EOFException;
        if (isEofException) {
            if (this.state.finalState) {
                logger.debug("[Stream #{}] Socket closed after session completed with state {}", (Object)this.planId(), (Object)this.state);
                return null;
            }
            logger.error("[Stream #{}] Socket closed before session completion, peer {} is probably down.", new Object[]{this.planId(), this.peer.getHostAddressAndPort(), e});
            return this.closeSession(State.FAILED);
        }
        this.logError(e);
        if (this.messageSender.connected()) {
            this.messageSender.sendMessage(new SessionFailedMessage());
        }
        return this.closeSession(State.FAILED);
    }

    private void logError(Throwable e) {
        if (e instanceof SocketTimeoutException) {
            logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? If not, maybe try increasing streaming_keep_alive_period_in_secs.", new Object[]{this.planId(), this.peer.getHostAddressAndPort(), this.template.connectTo == null ? "" : " through " + this.template.connectTo.getHostAddressAndPort(), 2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(), e});
        } else {
            logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", new Object[]{this.planId(), this.peer.getHostAddressAndPort(), this.template.connectTo == null ? "" : " through " + this.template.connectTo.getHostAddressAndPort(), e});
        }
    }

    public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) {
        this.state(State.PREPARING);
        ScheduledExecutors.nonPeriodicTasks.execute(() -> {
            try {
                this.prepareAsync(requests, summaries);
            }
            catch (Exception e) {
                this.onError(e);
            }
        });
    }

    private void prepareAsync(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) {
        for (StreamRequest request : requests) {
            this.addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true);
        }
        for (StreamSummary summary : summaries) {
            this.prepareReceiving(summary);
        }
        PrepareSynAckMessage prepareSynAck = new PrepareSynAckMessage();
        if (!this.peer.equals(FBUtilities.getBroadcastAddressAndPort())) {
            for (StreamTransferTask task : this.transfers.values()) {
                prepareSynAck.summaries.add(task.getSummary());
            }
        }
        this.messageSender.sendMessage(prepareSynAck);
        this.streamResult.handleSessionPrepared(this);
        if (this.isPreview()) {
            this.completePreview();
        } else {
            this.maybeCompleted();
        }
    }

    private void prepareSynAck(PrepareSynAckMessage msg) {
        if (!msg.summaries.isEmpty()) {
            for (StreamSummary summary : msg.summaries) {
                this.prepareReceiving(summary);
            }
            if (!this.isPreview()) {
                this.messageSender.sendMessage(new PrepareAckMessage());
            }
        }
        if (this.isPreview()) {
            this.completePreview();
        } else {
            this.startStreamingFiles(true);
        }
    }

    private void prepareAck(PrepareAckMessage msg) {
        if (this.isPreview()) {
            throw new RuntimeException(String.format("[Stream #%s] Cannot receive PrepareAckMessage for preview session", this.planId()));
        }
        this.startStreamingFiles(true);
    }

    public void streamSent(OutgoingStreamMessage message) {
        StreamTransferTask task;
        long headerSize = message.stream.getEstimatedSize();
        StreamingMetrics.totalOutgoingBytes.inc(headerSize);
        this.metrics.outgoingBytes.inc(headerSize);
        if (StreamOperation.REPAIR == this.getStreamOperation()) {
            StreamingMetrics.totalOutgoingRepairBytes.inc(headerSize);
            StreamingMetrics.totalOutgoingRepairSSTables.inc((long)message.stream.getNumFiles());
        }
        if ((task = this.transfers.get(message.header.tableId)) != null) {
            task.scheduleTimeout(message.header.sequenceNumber, 12L, TimeUnit.HOURS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receive(IncomingStreamMessage message) {
        if (this.isPreview()) {
            throw new RuntimeException(String.format("[Stream #%s] Cannot receive files for preview session", this.planId()));
        }
        long headerSize = message.stream.getSize();
        StreamingMetrics.totalIncomingBytes.inc(headerSize);
        this.metrics.incomingBytes.inc(headerSize);
        this.messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
        StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
        long receivedStartNanos = System.nanoTime();
        try {
            this.receivers.get(message.header.tableId).received(message.stream);
        }
        catch (Throwable throwable) {
            long latencyNanos = System.nanoTime() - receivedStartNanos;
            this.metrics.incomingProcessTime.update(latencyNanos, TimeUnit.NANOSECONDS);
            long latencyMs = TimeUnit.NANOSECONDS.toMillis(latencyNanos);
            int timeout = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
            if (timeout > 0 && latencyMs > (long)timeout) {
                NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1L, TimeUnit.MINUTES, "The time taken ({} ms) for processing the incoming stream message ({}) exceeded internode streaming TCP user timeout ({} ms).\nThe streaming connection might be closed due to tcp user timeout.\nTry to increase the internode_streaming_tcp_user_timeout_in_ms or set it to 0 to use system defaults.", latencyMs, message, timeout);
            }
            throw throwable;
        }
        long latencyNanos = System.nanoTime() - receivedStartNanos;
        this.metrics.incomingProcessTime.update(latencyNanos, TimeUnit.NANOSECONDS);
        long latencyMs = TimeUnit.NANOSECONDS.toMillis(latencyNanos);
        int timeout = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
        if (timeout > 0 && latencyMs > (long)timeout) {
            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1L, TimeUnit.MINUTES, "The time taken ({} ms) for processing the incoming stream message ({}) exceeded internode streaming TCP user timeout ({} ms).\nThe streaming connection might be closed due to tcp user timeout.\nTry to increase the internode_streaming_tcp_user_timeout_in_ms or set it to 0 to use system defaults.", latencyMs, message, timeout);
        }
    }

    public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total) {
        ProgressInfo progress = new ProgressInfo(this.peer, this.index, filename, direction, bytes, total);
        this.streamResult.handleProgress(progress);
    }

    public void received(TableId tableId, int sequenceNumber) {
        this.transfers.get(tableId).complete(sequenceNumber);
    }

    public synchronized void complete() {
        logger.debug("[Stream #{}] handling Complete message, state = {}", (Object)this.planId(), (Object)this.state);
        if (!this.isFollower) {
            if (this.state == State.WAIT_COMPLETE) {
                this.closeSession(State.COMPLETE);
            } else {
                this.state(State.WAIT_COMPLETE);
            }
        } else {
            throw new IllegalStateException(String.format("[Stream #%s] Complete message can be only received by the initiator!", this.planId()));
        }
    }

    private synchronized boolean maybeCompleted() {
        if (!this.receivers.isEmpty() || !this.transfers.isEmpty()) {
            return false;
        }
        if (this.maybeCompleted) {
            return true;
        }
        this.maybeCompleted = true;
        if (!this.isFollower) {
            if (this.state == State.WAIT_COMPLETE) {
                this.closeSession(State.COMPLETE);
            } else {
                this.state(State.WAIT_COMPLETE);
            }
        } else {
            this.messageSender.sendMessage(new CompleteMessage());
            this.closeSession(State.COMPLETE);
        }
        return true;
    }

    public synchronized void sessionFailed() {
        logger.error("[Stream #{}] Remote peer {} failed stream session.", (Object)this.planId(), (Object)this.peer.toString());
        this.closeSession(State.FAILED);
    }

    public SessionInfo getSessionInfo() {
        ArrayList receivingSummaries = Lists.newArrayList();
        for (StreamTask streamTask : this.receivers.values()) {
            receivingSummaries.add(streamTask.getSummary());
        }
        ArrayList transferSummaries = Lists.newArrayList();
        for (StreamTask streamTask : this.transfers.values()) {
            transferSummaries.add(streamTask.getSummary());
        }
        return new SessionInfo(this.peer, this.index, this.template.connectTo == null ? this.peer : this.template.connectTo, receivingSummaries, transferSummaries, this.state);
    }

    public synchronized void taskCompleted(StreamReceiveTask completedTask) {
        this.receivers.remove(completedTask.tableId);
        this.maybeCompleted();
    }

    public synchronized void taskCompleted(StreamTransferTask completedTask) {
        this.transfers.remove(completedTask.tableId);
        this.maybeCompleted();
    }

    @Override
    public void onRemove(InetAddressAndPort endpoint) {
        logger.error("[Stream #{}] Session failed because remote peer {} has left.", (Object)this.planId(), (Object)this.peer.toString());
        this.closeSession(State.FAILED);
    }

    @Override
    public void onRestart(InetAddressAndPort endpoint, EndpointState epState) {
        logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", (Object)this.planId(), (Object)this.peer.toString());
        this.closeSession(State.FAILED);
    }

    private void completePreview() {
        try {
            this.state(State.WAIT_COMPLETE);
            this.closeSession(State.COMPLETE);
        }
        finally {
            for (StreamTask task : Iterables.concat(this.receivers.values(), this.transfers.values())) {
                task.abort();
            }
        }
    }

    private void flushSSTables(Iterable<ColumnFamilyStore> stores) {
        ArrayList<ListenableFuture<CommitLogPosition>> flushes = new ArrayList<ListenableFuture<CommitLogPosition>>();
        for (ColumnFamilyStore cfs : stores) {
            flushes.add(cfs.forceFlush());
        }
        FBUtilities.waitOnFutures(flushes);
    }

    @VisibleForTesting
    public synchronized void prepareReceiving(StreamSummary summary) {
        this.failIfFinished();
        if (summary.files > 0) {
            this.receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize));
        }
    }

    private void startStreamingFiles(boolean notifyPrepared) {
        if (notifyPrepared) {
            this.streamResult.handleSessionPrepared(this);
        }
        this.state(State.STREAMING);
        for (StreamTransferTask task : this.transfers.values()) {
            Collection<OutgoingStreamMessage> messages = task.getFileMessages();
            if (!messages.isEmpty()) {
                for (OutgoingStreamMessage ofm : messages) {
                    ofm.header.addSessionInfo(this);
                    this.messageSender.sendMessage(ofm);
                }
                continue;
            }
            this.taskCompleted(task);
        }
        this.maybeCompleted();
    }

    @VisibleForTesting
    public int getNumRequests() {
        return this.requests.size();
    }

    @VisibleForTesting
    public int getNumTransfers() {
        return this.transferredRangesPerKeyspace.size();
    }

    public synchronized void abort() {
        logger.info("[Stream #{}] Aborting stream session with peer {}...", (Object)this.planId(), (Object)this.peer);
        if (this.getMessageSender().connected()) {
            this.getMessageSender().sendMessage(new SessionFailedMessage());
        }
        try {
            this.closeSession(State.ABORTED);
        }
        catch (Exception e) {
            logger.error("[Stream #{}] Error aborting stream session with peer {}", (Object)this.planId(), (Object)this.peer);
        }
    }

    @VisibleForTesting
    public static interface MessageStateSink {
        public static final MessageStateSink NONE = new MessageStateSink(){

            @Override
            public void recordState(InetAddressAndPort from, State state) {
            }

            @Override
            public void recordMessage(InetAddressAndPort from, StreamMessage.Type message) {
            }

            @Override
            public void onClose(InetAddressAndPort from) {
            }
        };

        public void recordState(InetAddressAndPort var1, State var2);

        public void recordMessage(InetAddressAndPort var1, StreamMessage.Type var2);

        public void onClose(InetAddressAndPort var1);
    }

    public static enum State {
        INITIALIZED(false),
        PREPARING(false),
        STREAMING(false),
        WAIT_COMPLETE(false),
        COMPLETE(true),
        FAILED(true),
        ABORTED(true);

        private final boolean finalState;

        private State(boolean finalState) {
            this.finalState = finalState;
        }

        public boolean isFinalState() {
            return this.finalState;
        }
    }
}

