/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.documentapi.messagebus;

import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.AckToken;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorDataQueue;
import com.yahoo.documentapi.VisitorIterator;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage;
import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.DocumentReply;
import com.yahoo.documentapi.messagebus.protocol.VisitorInfoMessage;
import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply;
import com.yahoo.messagebus.DestinationSession;
import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageHandler;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Result;
import com.yahoo.messagebus.Routable;
import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.ThrottlePolicy;
import com.yahoo.messagebus.Trace;
import com.yahoo.messagebus.routing.RoutingTable;
import com.yahoo.protect.Process;
import com.yahoo.vdslib.VisitorStatistics;
import com.yahoo.vdslib.state.ClusterState;
import java.util.Arrays;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MessageBusVisitorSession
implements VisitorSession {
    private static final Logger log = Logger.getLogger(MessageBusVisitorSession.class.getName());
    private static AtomicLong sessionCounter = new AtomicLong(0L);
    private final VisitorParameters params;
    private final Sender sender;
    private final Receiver receiver;
    private final AsyncTaskExecutor taskExecutor;
    private final VisitingProgress progress;
    private final VisitorStatistics statistics;
    private final String sessionName = MessageBusVisitorSession.createSessionName();
    private final String dataDestination;
    private final Clock clock;
    private StateDescription state;
    private long visitorCounter = 0L;
    private long startTimeNanos = 0L;
    private boolean scheduledSendCreateVisitors = false;
    private boolean done = false;
    private boolean destroying = false;
    private final Object completionMonitor = new Object();
    private Trace trace;
    private int pendingMessageCount = 0;

    private static long getNextSessionId() {
        return sessionCounter.incrementAndGet();
    }

    private static String createSessionName() {
        StringBuilder sb = new StringBuilder();
        sb.append("visitor-").append(MessageBusVisitorSession.getNextSessionId()).append('-').append(System.currentTimeMillis());
        return sb.toString();
    }

    public MessageBusVisitorSession(VisitorParameters visitorParameters, AsyncTaskExecutor taskExecutor, SenderFactory senderFactory, ReceiverFactory receiverFactory, RoutingTable routingTable) throws ParseException {
        this(visitorParameters, taskExecutor, senderFactory, receiverFactory, routingTable, new RealClock());
    }

    public MessageBusVisitorSession(VisitorParameters visitorParameters, AsyncTaskExecutor taskExecutor, SenderFactory senderFactory, ReceiverFactory receiverFactory, RoutingTable routingTable, Clock clock) throws ParseException {
        this.params = visitorParameters;
        this.initializeRoute(routingTable);
        this.sender = senderFactory.createSender(this.createReplyHandler(), this.params);
        this.receiver = receiverFactory.createReceiver(this.createMessageHandler(), this.sessionName);
        this.taskExecutor = taskExecutor;
        this.progress = this.createVisitingProgress(this.params);
        this.statistics = new VisitorStatistics();
        this.state = new StateDescription(State.NOT_STARTED);
        this.clock = clock;
        this.initializeHandlers();
        this.trace = new Trace(visitorParameters.getTraceLevel());
        this.dataDestination = this.params.getLocalDataHandler() == null ? this.params.getRemoteDataHandler() : this.receiver.getConnectionSpec();
        this.validateSessionParameters();
        if (this.progress.getIterator().isDone()) {
            this.markSessionCompleted();
        }
    }

    public static MessageBusVisitorSession createForMessageBus(MessageBus mbus, ScheduledExecutorService scheduledExecutorService, VisitorParameters params) throws ParseException {
        ThreadAsyncTaskExecutor executor = new ThreadAsyncTaskExecutor(scheduledExecutorService);
        MessageBusSenderFactory senderFactory = new MessageBusSenderFactory(mbus);
        MessageBusReceiverFactory receiverFactory = new MessageBusReceiverFactory(mbus);
        RoutingTable table = mbus.getRoutingTable(DocumentProtocol.NAME);
        return new MessageBusVisitorSession(params, executor, senderFactory, receiverFactory, table);
    }

    private void validateSessionParameters() {
        if (this.dataDestination == null) {
            throw new IllegalStateException("No data destination specified");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        ProgressToken progressToken = this.progress.getToken();
        synchronized (progressToken) {
            this.startTimeNanos = this.clock.monotonicNanoTime();
            if (this.progress.getIterator().isDone()) {
                log.log(Level.FINE, () -> this.sessionName + ": progress token indicates session is done before it could even start; no-op");
                return;
            }
            this.transitionTo(new StateDescription(State.WORKING));
            this.taskExecutor.submitTask(new SendCreateVisitorsTask(this.computeBoundedMessageTimeoutMillis(0L)));
        }
    }

    private void updateStateUnlessAlreadyFailed(StateDescription newState) {
        if (!this.state.failed()) {
            this.state = newState;
        }
    }

    private StateDescription transitionTo(StateDescription newState) {
        log.log(Level.FINE, () -> this.sessionName + ": attempting transition to state " + newState);
        switch (newState.getState()) {
            case WORKING: {
                assert (this.state.getState() == State.NOT_STARTED);
                this.state = newState;
                break;
            }
            case ABORTED: {
                this.state = newState;
                break;
            }
            case COMPLETED: 
            case FAILED: 
            case TIMED_OUT: {
                this.updateStateUnlessAlreadyFailed(newState);
                break;
            }
            default: {
                Process.logAndDie((String)("Invalid target transition state: " + newState));
            }
        }
        log.log(Level.FINE, () -> "Session '" + this.sessionName + "' is now in state " + this.state);
        return this.state;
    }

    private ReplyHandler createReplyHandler() {
        return reply -> {
            try {
                this.taskExecutor.submitTask(new HandleReplyTask(reply));
            }
            catch (RejectedExecutionException e) {
                log.log(Level.WARNING, "Visitor session '" + this.sessionName + "': failed to submit reply task to executor service! Session cannot reliably continue; terminating it early.", e);
                ProgressToken progressToken = this.progress.getToken();
                synchronized (progressToken) {
                    this.transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage()));
                    if (!this.done) {
                        this.markSessionCompleted();
                    }
                }
            }
        };
    }

    private MessageHandler createMessageHandler() {
        return message -> {
            try {
                this.taskExecutor.submitTask(new HandleMessageTask(message));
            }
            catch (RejectedExecutionException e) {
                DocumentReply reply = ((DocumentMessage)message).createReply();
                message.swapState((Routable)reply);
                reply.addError(new Error(151004, "Visitor session has been aborted"));
                this.receiver.reply(reply);
            }
        };
    }

    private void initializeRoute(RoutingTable routingTable) {
        if (this.params.getRoute() == null || !this.params.getRoute().hasHops()) {
            this.params.setRoute(this.getClusterRoute(routingTable));
            log.log(Level.FINE, () -> "No route specified; resolved implicit storage cluster: " + this.params.getRoute().toString());
        }
    }

    private String getClusterRoute(RoutingTable routingTable) throws IllegalArgumentException {
        String route = null;
        RoutingTable.RouteIterator it = routingTable.getRouteIterator();
        while (it.isValid()) {
            String str = it.getName();
            if (str.startsWith("storage/cluster.")) {
                if (route != null) {
                    throw new IllegalArgumentException("There are multiple storage clusters in your application, please specify which one to visit.");
                }
                route = str;
            }
            it.next();
        }
        if (route == null) {
            throw new IllegalArgumentException("No storage cluster found in your application.");
        }
        return route;
    }

    private void initializeHandlers() {
        if (this.params.getLocalDataHandler() != null) {
            this.params.getLocalDataHandler().reset();
            this.params.getLocalDataHandler().setSession(this);
        } else if (this.params.getRemoteDataHandler() == null) {
            this.params.setLocalDataHandler(new VisitorDataQueue());
            this.params.getLocalDataHandler().setSession(this);
        }
        if (this.params.getControlHandler() != null) {
            this.params.getControlHandler().reset();
        } else {
            this.params.setControlHandler(new VisitorControlHandler());
        }
        this.params.getControlHandler().setSession(this);
    }

    private VisitingProgress createVisitingProgress(VisitorParameters params) throws ParseException {
        VisitorIterator visitorIterator;
        ProgressToken progressToken = params.getResumeToken() != null ? params.getResumeToken() : new ProgressToken();
        if (params.getBucketsToVisit() == null || params.getBucketsToVisit().isEmpty()) {
            BucketIdFactory bucketIdFactory = new BucketIdFactory();
            visitorIterator = VisitorIterator.createFromDocumentSelection(params.getDocumentSelection(), bucketIdFactory, 1, progressToken);
        } else {
            if (log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, "parameters specify explicit bucket set to visit; using it rather than document selection (" + params.getBucketsToVisit().size() + " buckets given)");
            }
            visitorIterator = VisitorIterator.createFromExplicitBucketSet(params.getBucketsToVisit(), 1, progressToken);
        }
        return new VisitingProgress(visitorIterator, progressToken);
    }

    private void continueVisiting() {
        if (this.visitingCompleted()) {
            this.markSessionCompleted();
        } else {
            this.scheduleSendCreateVisitorsIfApplicable();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void markSessionCompleted() {
        log.log(Level.FINE, () -> "Visitor session '" + this.sessionName + "' has completed");
        if (this.params.getLocalDataHandler() != null) {
            this.params.getLocalDataHandler().onDone();
        }
        if (this.progress.getToken().containsFailedBuckets()) {
            this.transitionTo(new StateDescription(State.FAILED, this.progress.getToken().getFirstErrorMsg()));
        }
        this.transitionTo(new StateDescription(State.COMPLETED));
        this.params.getControlHandler().onDone(this.state.toCompletionCode(), this.state.getDescription());
        Object object = this.completionMonitor;
        synchronized (object) {
            this.done = true;
            this.completionMonitor.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessageProcessingException(Reply reply, Exception e, String what) {
        int errorCode;
        String errorDesc = this.formatProcessingException(e, what);
        String fullMsg = this.formatIdentifyingVisitorErrorString(errorDesc);
        log.log(Level.SEVERE, fullMsg, e);
        ProgressToken progressToken = this.progress.getToken();
        synchronized (progressToken) {
            if (!this.params.skipBucketsOnFatalErrors()) {
                errorCode = 250000;
                this.transitionTo(new StateDescription(State.FAILED, errorDesc));
            } else {
                errorCode = 251008;
            }
        }
        reply.addError(new Error(errorCode, errorDesc));
    }

    private String formatProcessingException(Exception e, String whileProcessing) {
        return String.format("Got exception of type %s with message '%s' while processing %s", e.getClass().getName(), e.getMessage(), whileProcessing);
    }

    private String formatIdentifyingVisitorErrorString(String details) {
        return String.format("Visitor %s (selection '%s'): %s", this.sessionName, this.params.getDocumentSelection(), details);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleVisitorInfoMessage(VisitorInfoMessage msg) {
        DocumentReply reply = msg.createReply();
        msg.swapState((Routable)reply);
        if (log.isLoggable(Level.FINE)) {
            log.log(Level.FINE, "Visitor session " + this.sessionName + ": Received VisitorInfo with " + msg.getFinishedBuckets().size() + " finished buckets");
        }
        try {
            if (msg.getErrorMessage().length() > 0) {
                this.params.getControlHandler().onVisitorError(msg.getErrorMessage());
            }
            ProgressToken progressToken = this.progress.getToken();
            synchronized (progressToken) {
                if (!this.isDone()) {
                    this.params.getControlHandler().onProgress(this.progress.getToken());
                } else {
                    reply.addError(new Error(250000, "Visitor has been shut down"));
                }
            }
        }
        catch (Exception e) {
            this.handleMessageProcessingException(reply, e, "VisitorInfoMessage");
        }
        finally {
            this.receiver.reply(reply);
        }
    }

    private void handleDocumentMessage(DocumentMessage msg) {
        DocumentReply reply = msg.createReply();
        msg.swapState((Routable)reply);
        if (this.params.getLocalDataHandler() == null) {
            log.log(Level.SEVERE, this.sessionName + ": Got visitor data back to client with no local data destination.");
            reply.addError(new Error(250000, "Visitor data with no local data destination"));
            this.receiver.reply(reply);
            return;
        }
        try {
            this.params.getLocalDataHandler().onMessage(msg, new AckToken((Object)reply));
        }
        catch (Exception e) {
            this.handleMessageProcessingException(reply, e, "DocumentMessage");
            this.receiver.reply(reply);
        }
    }

    private boolean isFatalError(Reply reply) {
        Error error = reply.getError(0);
        switch (error.getCode()) {
            case 151002: 
            case 151009: 
            case 200009: {
                return false;
            }
        }
        return error.isFatal();
    }

    private boolean shouldReportError(Reply reply) {
        Error error = reply.getError(0);
        switch (error.getCode()) {
            case 151009: 
            case 151012: {
                return false;
            }
        }
        return true;
    }

    private static String getErrorMessage(Error r) {
        return DocumentProtocol.getErrorName(r.getCode()) + ": " + r.getMessage();
    }

    private static boolean isErrorOfType(Reply reply, int errorCode) {
        return reply.getError(0).getCode() == errorCode;
    }

    private void reportVisitorError(String message) {
        this.params.getControlHandler().onVisitorError(message);
    }

    private void handleErrorReply(Reply reply) {
        CreateVisitorMessage msg = (CreateVisitorMessage)reply.getMessage();
        BucketId bucket = msg.getBuckets().get(0);
        BucketId subProgress = msg.getBuckets().get(1);
        this.progress.getIterator().update(bucket, subProgress);
        String message = MessageBusVisitorSession.getErrorMessage(reply.getError(0));
        log.log(Level.FINE, () -> this.sessionName + ": received error reply for bucket " + bucket + " with message '" + message + "'");
        if (this.isFatalError(reply)) {
            if (this.params.skipBucketsOnFatalErrors()) {
                this.markBucketProgressAsFailed(bucket, subProgress, message);
            } else {
                this.reportVisitorError(message);
                this.transitionTo(new StateDescription(State.FAILED, message));
                return;
            }
        }
        if (MessageBusVisitorSession.isErrorOfType(reply, 151002)) {
            this.handleWrongDistributionReply((WrongDistributionReply)reply);
        } else {
            if (this.shouldReportError(reply)) {
                this.reportVisitorError(message);
            }
            this.scheduleSendCreateVisitorsIfApplicable(100L, TimeUnit.MILLISECONDS);
        }
    }

    private void markBucketProgressAsFailed(BucketId bucket, BucketId subProgress, String message) {
        this.progress.getToken().addFailedBucket(bucket, subProgress, message);
        this.progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET);
    }

    private boolean enoughHitsReceived() {
        if (this.params.getMaxFirstPassHits() != -1L && this.statistics.getDocumentsReturned() >= this.params.getMaxFirstPassHits()) {
            return true;
        }
        return this.params.getMaxTotalHits() != -1L && this.statistics.getDocumentsReturned() + this.statistics.getSecondPassDocumentsReturned() >= this.params.getMaxTotalHits();
    }

    private boolean visitingCompleted() {
        return this.pendingMessageCount == 0 && (this.progress.getIterator().isDone() || this.state.failed() || this.enoughHitsReceived());
    }

    private long messageTimeoutMillis() {
        return !MessageBusVisitorSession.isInfiniteTimeout(this.params.getTimeoutMs()) ? Math.max(1L, this.params.getTimeoutMs()) : 300000L;
    }

    private long sessionTimeoutMillis() {
        return this.params.getSessionTimeoutMs();
    }

    private long elapsedTimeMillis() {
        return TimeUnit.NANOSECONDS.toMillis(this.clock.monotonicNanoTime() - this.startTimeNanos);
    }

    private static boolean isInfiniteTimeout(long timeoutMillis) {
        return timeoutMillis < 0L;
    }

    private long computeBoundedMessageTimeoutMillis(long elapsedMs) {
        long messageTimeoutMillis = this.messageTimeoutMillis();
        return !MessageBusVisitorSession.isInfiniteTimeout(this.sessionTimeoutMillis()) ? Math.min(Math.max(1L, this.sessionTimeoutMillis() - elapsedMs), messageTimeoutMillis) : messageTimeoutMillis;
    }

    private void scheduleSendCreateVisitorsIfApplicable(long delay, TimeUnit unit) {
        long elapsedMillis = this.elapsedTimeMillis();
        if (!MessageBusVisitorSession.isInfiniteTimeout(this.sessionTimeoutMillis()) && elapsedMillis >= this.sessionTimeoutMillis()) {
            this.transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", this.sessionTimeoutMillis())));
            if (this.visitingCompleted()) {
                this.markSessionCompleted();
            }
            return;
        }
        if (!this.mayScheduleCreateVisitorsTask()) {
            return;
        }
        long messageTimeoutMillis = this.computeBoundedMessageTimeoutMillis(elapsedMillis);
        this.taskExecutor.scheduleTask(new SendCreateVisitorsTask(messageTimeoutMillis), delay, unit);
        this.scheduledSendCreateVisitors = true;
    }

    private boolean mayScheduleCreateVisitorsTask() {
        return !this.scheduledSendCreateVisitors && this.progress.getIterator().hasNext() && !this.state.failed() && !this.enoughHitsReceived();
    }

    private void scheduleSendCreateVisitorsIfApplicable() {
        this.scheduleSendCreateVisitorsIfApplicable(0L, TimeUnit.MILLISECONDS);
    }

    private void handleCreateVisitorReply(CreateVisitorReply reply) {
        CreateVisitorMessage msg = (CreateVisitorMessage)reply.getMessage();
        BucketId superbucket = msg.getBuckets().get(0);
        BucketId subBucketProgress = reply.getLastBucket();
        log.log(Level.FINE, () -> this.sessionName + ": received CreateVisitorReply for bucket " + superbucket + " with progress " + subBucketProgress);
        this.progress.getIterator().update(superbucket, subBucketProgress);
        this.params.getControlHandler().onProgress(this.progress.getToken());
        this.statistics.add(reply.getVisitorStatistics());
        this.params.getControlHandler().onVisitorStatistics(this.statistics);
        if (!reply.getTrace().getRoot().isEmpty() && this.trace.getRoot().getNumChildren() < 1000) {
            this.trace.getRoot().addChild(reply.getTrace().getRoot());
        }
        if (this.params.getDynamicallyIncreaseMaxBucketsPerVisitor() && (double)reply.getVisitorStatistics().getDocumentsReturned() < (double)this.params.getMaxFirstPassHits() / 2.0) {
            int newMaxBuckets = Math.max(Math.min((int)((float)this.params.getMaxBucketsPerVisitor() * this.params.getDynamicMaxBucketsIncreaseFactor()), 128), 1);
            this.params.setMaxBucketsPerVisitor(newMaxBuckets);
            log.log(Level.FINE, () -> this.sessionName + ": increasing max buckets per visitor to " + this.params.getMaxBucketsPerVisitor());
        }
    }

    private void handleWrongDistributionReply(WrongDistributionReply reply) {
        try {
            ClusterState newState = new ClusterState(reply.getSystemState());
            int stateBits = newState.getDistributionBitCount();
            if (stateBits != this.progress.getIterator().getDistributionBitCount()) {
                log.log(Level.FINE, () -> "System state changed; now at " + stateBits + " distribution bits");
                this.progress.getIterator().setDistributionBitCount(stateBits);
            }
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Failed to parse new system state string: " + reply.getSystemState());
            this.transitionTo(new StateDescription(State.FAILED, "Failed to parse cluster state '" + reply.getSystemState() + "'"));
        }
    }

    public String getSessionName() {
        return this.sessionName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isDone() {
        ProgressToken progressToken = this.progress.getToken();
        synchronized (progressToken) {
            return this.done;
        }
    }

    @Override
    public ProgressToken getProgress() {
        return this.progress.getToken();
    }

    @Override
    public Trace getTrace() {
        return this.trace;
    }

    @Override
    public boolean waitUntilDone(long timeoutMs) throws InterruptedException {
        return this.params.getControlHandler().waitUntilDone(timeoutMs);
    }

    @Override
    public void ack(AckToken token) {
        if (log.isLoggable(Level.FINE)) {
            log.log(Level.FINE, "Visitor session " + this.sessionName + ": Sending ack " + token.ackObject);
        }
        this.receiver.reply((Reply)token.ackObject);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void abort() {
        ProgressToken progressToken = this.progress.getToken();
        synchronized (progressToken) {
            this.transitionTo(new StateDescription(State.ABORTED, "Visitor aborted by user"));
        }
    }

    @Override
    public VisitorResponse getNext() {
        if (this.params.getLocalDataHandler() == null) {
            throw new IllegalStateException("Data has been routed to external source for this visitor");
        }
        return this.params.getLocalDataHandler().getNext();
    }

    @Override
    public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException {
        if (this.params.getLocalDataHandler() == null) {
            throw new IllegalStateException("Data has been routed to external source for this visitor");
        }
        return this.params.getLocalDataHandler().getNext(timeoutMilliseconds);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isDestroying() {
        Object object = this.completionMonitor;
        synchronized (object) {
            return this.destroying;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy() {
        log.log(Level.FINE, () -> this.sessionName + ": synchronous destroy() called");
        try {
            Object object = this.progress.getToken();
            synchronized (object) {
                Object object2 = this.completionMonitor;
                synchronized (object2) {
                    if (!this.done) {
                        this.transitionTo(new StateDescription(State.ABORTED, "Session explicitly destroyed before completion"));
                    }
                }
            }
            object = this.completionMonitor;
            synchronized (object) {
                assert (!this.destroying) : "Attempted to destroy VisitorSession more than once";
                this.destroying = true;
                while (!this.done) {
                    this.completionMonitor.wait();
                }
            }
        }
        catch (InterruptedException e) {
            log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed");
        }
        finally {
            try {
                this.sender.destroy();
                this.receiver.destroy();
            }
            catch (Exception e) {
                log.log(Level.SEVERE, "Caught exception destroying communication interfaces", e);
            }
            log.log(Level.FINE, () -> this.sessionName + ": synchronous destroy() done");
        }
    }

    private class HandleMessageTask
    implements Runnable {
        private Message message;

        private HandleMessageTask(Message message) {
            this.message = message;
        }

        @Override
        public void run() {
            if (log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, "Visitor session " + MessageBusVisitorSession.this.sessionName + ": Received message " + this.message);
            }
            try {
                if (this.message instanceof VisitorInfoMessage) {
                    MessageBusVisitorSession.this.handleVisitorInfoMessage((VisitorInfoMessage)this.message);
                } else {
                    MessageBusVisitorSession.this.handleDocumentMessage((DocumentMessage)this.message);
                }
            }
            catch (Throwable t) {
                Process.logAndDie((String)"Caught unhandled error when processing message", (Throwable)t);
            }
        }
    }

    private class HandleReplyTask
    implements Runnable {
        private Reply reply;

        HandleReplyTask(Reply reply) {
            this.reply = reply;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ProgressToken progressToken = MessageBusVisitorSession.this.progress.getToken();
            synchronized (progressToken) {
                try {
                    assert (MessageBusVisitorSession.this.pendingMessageCount > 0);
                    --MessageBusVisitorSession.this.pendingMessageCount;
                    if (this.reply.hasErrors()) {
                        MessageBusVisitorSession.this.handleErrorReply(this.reply);
                    } else if (this.reply instanceof CreateVisitorReply) {
                        MessageBusVisitorSession.this.handleCreateVisitorReply((CreateVisitorReply)this.reply);
                    } else {
                        String msg = "Received reply we do not know how to handle: " + this.reply.getClass().getName();
                        log.log(Level.SEVERE, msg);
                        MessageBusVisitorSession.this.transitionTo(new StateDescription(State.FAILED, msg));
                    }
                }
                catch (Exception e) {
                    String msg = "Got exception of type " + e.getClass().getName() + " with message '" + e.getMessage() + "' while processing reply in visitor session";
                    log.log(Level.WARNING, msg, e);
                    MessageBusVisitorSession.this.transitionTo(new StateDescription(State.FAILED, msg));
                }
                catch (Throwable t) {
                    Process.logAndDie((String)"Caught unhandled error when running reply task", (Throwable)t);
                }
                finally {
                    MessageBusVisitorSession.this.continueVisiting();
                }
            }
        }
    }

    private class SendCreateVisitorsTask
    implements Runnable {
        private final long messageTimeoutMs;

        SendCreateVisitorsTask(long messageTimeoutMs) {
            this.messageTimeoutMs = messageTimeoutMs;
        }

        private String getNextVisitorId() {
            StringBuilder sb = new StringBuilder();
            ++MessageBusVisitorSession.this.visitorCounter;
            sb.append(MessageBusVisitorSession.this.sessionName).append('-').append(MessageBusVisitorSession.this.visitorCounter);
            return sb.toString();
        }

        private CreateVisitorMessage createMessage(VisitorIterator.BucketProgress bucket) {
            CreateVisitorMessage msg = new CreateVisitorMessage(MessageBusVisitorSession.this.params.getVisitorLibrary(), this.getNextVisitorId(), MessageBusVisitorSession.this.receiver.getConnectionSpec(), MessageBusVisitorSession.this.dataDestination);
            msg.getTrace().setLevel(MessageBusVisitorSession.this.params.getTraceLevel());
            msg.setTimeRemaining(this.messageTimeoutMs);
            msg.setBuckets(Arrays.asList(bucket.getSuperbucket(), bucket.getProgress()));
            msg.setDocumentSelection(MessageBusVisitorSession.this.params.getDocumentSelection());
            msg.setBucketSpace(MessageBusVisitorSession.this.params.getBucketSpace());
            msg.setFromTimestamp(MessageBusVisitorSession.this.params.getFromTimestamp());
            msg.setToTimestamp(MessageBusVisitorSession.this.params.getToTimestamp());
            msg.setMaxPendingReplyCount(MessageBusVisitorSession.this.params.getMaxPending());
            msg.setFieldSet(MessageBusVisitorSession.this.params.fieldSet());
            msg.setVisitInconsistentBuckets(MessageBusVisitorSession.this.params.visitInconsistentBuckets());
            msg.setVisitRemoves(MessageBusVisitorSession.this.params.visitRemoves());
            msg.setParameters(MessageBusVisitorSession.this.params.getLibraryParameters());
            msg.setRoute(MessageBusVisitorSession.this.params.getRoute());
            msg.setMaxBucketsPerVisitor(MessageBusVisitorSession.this.params.getMaxBucketsPerVisitor());
            msg.setLoadType(MessageBusVisitorSession.this.params.getLoadType());
            msg.setPriority(MessageBusVisitorSession.this.params.getPriority());
            msg.setRetryEnabled(false);
            return msg;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ProgressToken progressToken = MessageBusVisitorSession.this.progress.getToken();
            synchronized (progressToken) {
                try {
                    MessageBusVisitorSession.this.scheduledSendCreateVisitors = false;
                    if (MessageBusVisitorSession.this.done) {
                        return;
                    }
                    while (MessageBusVisitorSession.this.progress.getIterator().hasNext()) {
                        VisitorIterator.BucketProgress bucket = MessageBusVisitorSession.this.progress.getIterator().getNext();
                        Result result = MessageBusVisitorSession.this.sender.send(this.createMessage(bucket));
                        if (result.isAccepted()) {
                            log.log(Level.FINE, () -> MessageBusVisitorSession.this.sessionName + ": sent CreateVisitor for bucket " + bucket.getSuperbucket() + " with progress " + bucket.getProgress());
                            ++MessageBusVisitorSession.this.pendingMessageCount;
                            continue;
                        }
                        MessageBusVisitorSession.this.progress.getIterator().update(bucket.getSuperbucket(), bucket.getProgress());
                        break;
                    }
                }
                catch (Exception e) {
                    String msg = "Got exception of type " + e.getClass().getName() + " with message '" + e.getMessage() + "' while attempting to send visitors";
                    log.log(Level.WARNING, msg);
                    MessageBusVisitorSession.this.transitionTo(new StateDescription(State.FAILED, msg));
                    MessageBusVisitorSession.this.continueVisiting();
                }
                catch (Throwable t) {
                    Process.logAndDie((String)"Caught unhandled error when trying to send visitors", (Throwable)t);
                }
            }
        }
    }

    public static class RealClock
    implements Clock {
        @Override
        public long monotonicNanoTime() {
            return System.nanoTime();
        }
    }

    public static class ThreadAsyncTaskExecutor
    implements AsyncTaskExecutor {
        private final ScheduledExecutorService executor;

        public ThreadAsyncTaskExecutor(ScheduledExecutorService executor) {
            this.executor = executor;
        }

        @Override
        public void submitTask(Runnable task) {
            this.executor.submit(task);
        }

        @Override
        public void scheduleTask(Runnable task, long delay, TimeUnit unit) {
            this.executor.schedule(task, delay, unit);
        }
    }

    public static class MessageBusReceiverFactory
    implements ReceiverFactory {
        private final MessageBus messageBus;

        public MessageBusReceiverFactory(MessageBus messageBus) {
            this.messageBus = messageBus;
        }

        private DestinationSessionParams createDestinationParams(MessageHandler messageHandler, String visitorName) {
            DestinationSessionParams destparams = new DestinationSessionParams();
            destparams.setName(visitorName);
            destparams.setBroadcastName(false);
            destparams.setMessageHandler(messageHandler);
            return destparams;
        }

        @Override
        public Receiver createReceiver(MessageHandler messageHandler, String sessionName) {
            DestinationSessionParams destinationParams = this.createDestinationParams(messageHandler, sessionName);
            return new MessageBusReceiver(this.messageBus.createDestinationSession(destinationParams));
        }
    }

    public static class MessageBusReceiver
    implements Receiver {
        private final DestinationSession destinationSession;

        public MessageBusReceiver(DestinationSession destinationSession) {
            this.destinationSession = destinationSession;
        }

        @Override
        public void reply(Reply reply) {
            this.destinationSession.reply(reply);
        }

        @Override
        public void destroy() {
            this.destinationSession.destroy();
        }

        @Override
        public String getConnectionSpec() {
            return this.destinationSession.getConnectionSpec();
        }
    }

    public static class MessageBusSenderFactory
    implements SenderFactory {
        private final MessageBus messageBus;

        public MessageBusSenderFactory(MessageBus messageBus) {
            this.messageBus = messageBus;
        }

        private SourceSessionParams createSourceSessionParams(VisitorParameters visitorParameters) {
            SourceSessionParams sourceParams = new SourceSessionParams();
            if (visitorParameters.getThrottlePolicy() != null) {
                sourceParams.setThrottlePolicy(visitorParameters.getThrottlePolicy());
            } else {
                sourceParams.setThrottlePolicy((ThrottlePolicy)new DynamicThrottlePolicy());
            }
            return sourceParams;
        }

        @Override
        public Sender createSender(ReplyHandler replyHandler, VisitorParameters visitorParameters) {
            this.messageBus.setMaxPendingCount(0);
            SourceSessionParams sessionParams = this.createSourceSessionParams(visitorParameters);
            return new MessageBusSender(this.messageBus.createSourceSession(replyHandler, sessionParams));
        }
    }

    public static class MessageBusSender
    implements Sender {
        private final SourceSession sourceSession;

        public MessageBusSender(SourceSession sourceSession) {
            this.sourceSession = sourceSession;
        }

        @Override
        public Result send(Message msg) {
            return this.sourceSession.send(msg);
        }

        @Override
        public int getPendingCount() {
            return this.sourceSession.getPendingCount();
        }

        @Override
        public void destroy() {
            this.sourceSession.destroy();
        }
    }

    public class StateDescription {
        private final State state;
        private final String description;

        public StateDescription(State state, String description) {
            this.state = state;
            this.description = description;
        }

        public StateDescription(State state) {
            this.state = state;
            this.description = "";
        }

        public State getState() {
            return this.state;
        }

        public String getDescription() {
            return this.description;
        }

        VisitorControlHandler.CompletionCode toCompletionCode() {
            switch (this.state) {
                case COMPLETED: {
                    return VisitorControlHandler.CompletionCode.SUCCESS;
                }
                case ABORTED: {
                    return VisitorControlHandler.CompletionCode.ABORTED;
                }
                case FAILED: {
                    return VisitorControlHandler.CompletionCode.FAILURE;
                }
                case TIMED_OUT: {
                    return VisitorControlHandler.CompletionCode.TIMEOUT;
                }
            }
            throw new IllegalStateException("Current state did not have a valid value: " + this.state);
        }

        public boolean failed() {
            return this.state.isFailure();
        }

        public String toString() {
            return this.state + ": " + this.description;
        }
    }

    public static enum State {
        NOT_STARTED(false),
        WORKING(false),
        COMPLETED(false),
        ABORTED(true),
        FAILED(true),
        TIMED_OUT(true);

        private final boolean failure;

        private State(boolean failure) {
            this.failure = failure;
        }

        public boolean isFailure() {
            return this.failure;
        }
    }

    public static class VisitingProgress {
        private final VisitorIterator iterator;
        private final ProgressToken token;

        public VisitingProgress(VisitorIterator iterator, ProgressToken token) {
            this.iterator = iterator;
            this.token = token;
        }

        public VisitorIterator getIterator() {
            return this.iterator;
        }

        public ProgressToken getToken() {
            return this.token;
        }
    }

    public static interface Clock {
        public long monotonicNanoTime();
    }

    public static interface AsyncTaskExecutor {
        public void submitTask(Runnable var1);

        public void scheduleTask(Runnable var1, long var2, TimeUnit var4);
    }

    public static interface ReceiverFactory {
        public Receiver createReceiver(MessageHandler var1, String var2);
    }

    public static interface Receiver {
        public void reply(Reply var1);

        public void destroy();

        public String getConnectionSpec();
    }

    public static interface SenderFactory {
        public Sender createSender(ReplyHandler var1, VisitorParameters var2);
    }

    public static interface Sender {
        public Result send(Message var1);

        public int getPendingCount();

        public void destroy();
    }
}

