/*
 * Decompiled with CFR 0.152.
 */
package com.signalfx.signalflow;

import com.signalfx.signalflow.Channel;
import com.signalfx.signalflow.ChannelMessage;
import com.signalfx.signalflow.ComputationAbortedException;
import com.signalfx.signalflow.ComputationFailedException;
import com.signalfx.signalflow.SignalFlowException;
import com.signalfx.signalflow.SignalFlowTransport;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;

public class Computation
implements Iterable<ChannelMessage>,
Iterator<ChannelMessage> {
    protected SignalFlowTransport transport;
    protected String program;
    protected Map<String, String> params;
    protected boolean isAttachedChannel;
    private Map<String, Map<String, Object>> metadata = new HashMap<String, Map<String, Object>>();
    private String id;
    private Channel channel;
    private ChannelMessage nextMessage;
    private State state = State.STATE_UNKNOWN;
    private long lastLogicalTimestampMs = -1L;
    private long resolution;
    private int expectedBatches;
    private boolean batchCountDetected;
    private int currentBatchCount;
    private ChannelMessage.DataMessage currentBatchMessage;

    public Computation(SignalFlowTransport transport, String program, Map<String, String> params, boolean attach) {
        this.transport = transport;
        this.program = program;
        this.params = params;
        this.isAttachedChannel = attach;
        this.channel = this.isAttachedChannel ? this.attach() : this.execute();
    }

    public String getId() {
        return this.id;
    }

    public long getResolution() {
        return this.resolution;
    }

    public State getState() {
        return this.state;
    }

    public long getLastLogicalTimestampMs() {
        return this.lastLogicalTimestampMs;
    }

    public Collection<String> getKnownTSIDs() {
        ArrayList<String> list = new ArrayList<String>(this.metadata.keySet());
        Collections.sort(list);
        return list;
    }

    public Map<String, Object> getMetadata(String tsid) {
        return this.metadata.get(tsid);
    }

    @Override
    public Iterator<ChannelMessage> iterator() {
        return this;
    }

    @Override
    public boolean hasNext() throws ComputationAbortedException, ComputationFailedException, SignalFlowException {
        while (this.state != State.STATE_COMPLETED && !this.channel.isClosed && this.nextMessage == null) {
            this.parseNext();
        }
        return this.nextMessage != null;
    }

    @Override
    public ChannelMessage next() throws ComputationAbortedException, ComputationFailedException, SignalFlowException, NoSuchElementException {
        while (this.state != State.STATE_COMPLETED && !this.channel.isClosed && this.nextMessage == null) {
            this.parseNext();
        }
        if (this.nextMessage != null) {
            ChannelMessage message = this.nextMessage;
            this.nextMessage = null;
            return message;
        }
        throw new NoSuchElementException("no more stream messages");
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("remove not supported");
    }

    public void close() {
        this.channel.close();
        this.nextMessage = null;
    }

    private Channel execute() throws SignalFlowException {
        HashMap<String, String> params = new HashMap<String, String>(this.params);
        if (this.lastLogicalTimestampMs >= 0L) {
            params.put("start", Long.toString(this.lastLogicalTimestampMs));
        }
        return this.transport.execute(this.program, params);
    }

    private Channel attach() throws SignalFlowException {
        return this.transport.attach(this.program, this.params);
    }

    private void parseNext() throws ComputationAbortedException, ComputationFailedException, SignalFlowException {
        this.nextMessage = null;
        while (this.state != State.STATE_COMPLETED) {
            if (!this.channel.hasNext()) {
                if (this.state != State.STATE_COMPLETED) {
                    this.channel.close();
                    this.channel = this.isAttachedChannel ? this.attach() : this.execute();
                    continue;
                }
            } else {
                ChannelMessage message = this.channel.next();
                switch (message.channelMessageType) {
                    case STREAM_START: {
                        this.state = State.STATE_STREAM_STARTED;
                        break;
                    }
                    case JOB_START: {
                        this.state = State.STATE_COMPUTATION_STARTED;
                        this.nextMessage = message;
                        this.id = ((ChannelMessage.JobStartMessage)message).getHandle();
                        break;
                    }
                    case JOB_PROGRESS: {
                        this.nextMessage = message;
                        break;
                    }
                    case CHANNEL_ABORT: {
                        this.state = State.STATE_ABORTED;
                        ChannelMessage.ChannelAbortMessage abortMessage = (ChannelMessage.ChannelAbortMessage)message;
                        throw new ComputationAbortedException(abortMessage.getAbortInfo());
                    }
                    case END_OF_CHANNEL: {
                        this.state = State.STATE_COMPLETED;
                        break;
                    }
                    case METADATA_MESSAGE: {
                        ChannelMessage.MetadataMessage metadataMessage = (ChannelMessage.MetadataMessage)message;
                        this.metadata.put(metadataMessage.getTsId(), metadataMessage.getProperties());
                        this.nextMessage = message;
                        break;
                    }
                    case EXPIRED_TSID_MESSAGE: {
                        ChannelMessage.ExpiredTsIdMessage expiredTsIdMessage = (ChannelMessage.ExpiredTsIdMessage)message;
                        this.metadata.remove(expiredTsIdMessage.getTsId());
                        this.nextMessage = message;
                        break;
                    }
                    case INFO_MESSAGE: {
                        ChannelMessage.InfoMessage infoMessage = (ChannelMessage.InfoMessage)message;
                        String messageCode = (String)infoMessage.getMessage().get("messageCode");
                        if ("JOB_RUNNING_RESOLUTION".equals(messageCode)) {
                            LinkedHashMap contents = (LinkedHashMap)infoMessage.getMessage().get("contents");
                            this.resolution = ((Number)contents.get("resolutionMs")).longValue();
                        }
                        this.batchCountDetected = true;
                        if (this.currentBatchMessage == null) break;
                        this.setNextDataMessageToYield();
                        break;
                    }
                    case DATA_MESSAGE: {
                        this.state = State.STATE_DATA_RECEIVED;
                        if (!this.batchCountDetected) {
                            ++this.expectedBatches;
                        }
                        ChannelMessage.DataMessage dataMessage = (ChannelMessage.DataMessage)message;
                        if (this.currentBatchMessage == null) {
                            this.currentBatchMessage = dataMessage;
                            this.currentBatchCount = 1;
                        } else if (dataMessage.getLogicalTimestampMs() == this.currentBatchMessage.getLogicalTimestampMs()) {
                            this.currentBatchMessage.addData(dataMessage.getData());
                            ++this.currentBatchCount;
                        } else {
                            this.batchCountDetected = true;
                        }
                        if (!this.batchCountDetected || this.currentBatchMessage == null || this.currentBatchCount != this.expectedBatches) break;
                        this.setNextDataMessageToYield();
                        break;
                    }
                    case EVENT_MESSAGE: {
                        this.nextMessage = message;
                        break;
                    }
                    case ERROR_MESSAGE: {
                        ChannelMessage.ErrorMessage errorMessage = (ChannelMessage.ErrorMessage)message;
                        throw new ComputationFailedException(errorMessage.getErrors());
                    }
                }
            }
            if (this.nextMessage == null) continue;
            break;
        }
    }

    private void setNextDataMessageToYield() {
        ChannelMessage.DataMessage yieldMessage = this.currentBatchMessage;
        this.currentBatchMessage = null;
        this.currentBatchCount = 0;
        this.lastLogicalTimestampMs = yieldMessage.getLogicalTimestampMs();
        this.nextMessage = yieldMessage;
    }

    public static enum State {
        STATE_UNKNOWN,
        STATE_STREAM_STARTED,
        STATE_COMPUTATION_STARTED,
        STATE_DATA_RECEIVED,
        STATE_COMPLETED,
        STATE_ABORTED;

    }
}

