/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.protocol.socket;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.socket.Response;
import org.apache.nifi.remote.protocol.socket.ResponseCode;
import org.apache.nifi.remote.protocol.socket.SocketClientTransactionCompletion;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketClientTransaction
implements Transaction {
    private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
    private final long creationNanoTime = System.nanoTime();
    private final CRC32 crc = new CRC32();
    private final int protocolVersion;
    private final FlowFileCodec codec;
    private final DataInputStream dis;
    private final DataOutputStream dos;
    private final TransferDirection direction;
    private final boolean compress;
    private final Peer peer;
    private final int penaltyMillis;
    private final String destinationId;
    private final EventReporter eventReporter;
    private boolean dataAvailable = false;
    private int transfers = 0;
    private long contentBytes = 0L;
    private Transaction.TransactionState state;

    SocketClientTransaction(int protocolVersion, String destinationId, Peer peer, FlowFileCodec codec, TransferDirection direction, boolean useCompression, int penaltyMillis, EventReporter eventReporter) throws IOException {
        this.protocolVersion = protocolVersion;
        this.destinationId = destinationId;
        this.peer = peer;
        this.codec = codec;
        this.direction = direction;
        this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
        this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
        this.compress = useCompression;
        this.state = Transaction.TransactionState.TRANSACTION_STARTED;
        this.penaltyMillis = penaltyMillis;
        this.eventReporter = eventReporter;
        this.initialize();
    }

    private void initialize() throws IOException {
        try {
            if (this.direction == TransferDirection.RECEIVE) {
                RequestType.RECEIVE_FLOWFILES.writeRequestType(this.dos);
                this.dos.flush();
                Response dataAvailableCode = Response.read(this.dis);
                switch (dataAvailableCode.getCode()) {
                    case MORE_DATA: {
                        logger.debug("{} {} Indicates that data is available", (Object)this, (Object)this.peer);
                        this.dataAvailable = true;
                        break;
                    }
                    case NO_MORE_DATA: {
                        logger.debug("{} No data available from {}", (Object)this.peer);
                        this.dataAvailable = false;
                        return;
                    }
                    default: {
                        throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
                    }
                }
            } else {
                RequestType.SEND_FLOWFILES.writeRequestType(this.dos);
                this.dos.flush();
            }
        }
        catch (Exception e) {
            this.error();
            throw e;
        }
    }

    @Override
    public DataPacket receive() throws IOException {
        try {
            try {
                if (this.state != Transaction.TransactionState.DATA_EXCHANGED && this.state != Transaction.TransactionState.TRANSACTION_STARTED) {
                    throw new IllegalStateException("Cannot receive data from " + this.peer + " because Transaction State is " + (Object)((Object)this.state));
                }
                if (this.direction == TransferDirection.SEND) {
                    throw new IllegalStateException("Attempting to receive data from " + this.peer + " but started a SEND Transaction");
                }
                if (!this.dataAvailable) {
                    return null;
                }
                if (this.transfers > 0) {
                    Response dataAvailableCode = Response.read(this.dis);
                    switch (dataAvailableCode.getCode()) {
                        case CONTINUE_TRANSACTION: {
                            logger.debug("{} {} Indicates Transaction should continue", (Object)this, (Object)this.peer);
                            this.dataAvailable = true;
                            break;
                        }
                        case FINISH_TRANSACTION: {
                            logger.debug("{} {} Indicates Transaction should finish", (Object)this, (Object)this.peer);
                            this.dataAvailable = false;
                            break;
                        }
                        default: {
                            throw new ProtocolException("Got unexpected response from " + this.peer + " when asking for data: " + dataAvailableCode);
                        }
                    }
                }
                if (!this.dataAvailable) {
                    return null;
                }
                logger.debug("{} Receiving data from {}", (Object)this, (Object)this.peer);
                Object dataIn = this.compress ? new CompressionInputStream((InputStream)this.dis) : this.dis;
                DataPacket packet = this.codec.decode(new CheckedInputStream((InputStream)dataIn, this.crc));
                if (packet == null) {
                    this.dataAvailable = false;
                } else {
                    ++this.transfers;
                    this.contentBytes += packet.getSize();
                }
                this.state = Transaction.TransactionState.DATA_EXCHANGED;
                return packet;
            }
            catch (IOException ioe) {
                throw new IOException("Failed to receive data from " + this.peer + " due to " + ioe, ioe);
            }
        }
        catch (Exception e) {
            this.error();
            throw e;
        }
    }

    @Override
    public void send(byte[] content, Map<String, String> attributes) throws IOException {
        this.send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length));
    }

    @Override
    public void send(DataPacket dataPacket) throws IOException {
        try {
            try {
                if (this.state != Transaction.TransactionState.DATA_EXCHANGED && this.state != Transaction.TransactionState.TRANSACTION_STARTED) {
                    throw new IllegalStateException("Cannot send data to " + this.peer + " because Transaction State is " + (Object)((Object)this.state));
                }
                if (this.direction == TransferDirection.RECEIVE) {
                    throw new IllegalStateException("Attempting to send data to " + this.peer + " but started a RECEIVE Transaction");
                }
                if (this.transfers > 0) {
                    ResponseCode.CONTINUE_TRANSACTION.writeResponse(this.dos);
                }
                logger.debug("{} Sending data to {}", (Object)this, (Object)this.peer);
                Object dataOut = this.compress ? new CompressionOutputStream((OutputStream)this.dos) : this.dos;
                CheckedOutputStream out = new CheckedOutputStream((OutputStream)dataOut, this.crc);
                this.codec.encode(dataPacket, out);
                if (this.compress) {
                    ((OutputStream)out).close();
                }
                ++this.transfers;
                this.contentBytes += dataPacket.getSize();
                this.state = Transaction.TransactionState.DATA_EXCHANGED;
            }
            catch (IOException ioe) {
                throw new IOException("Failed to send data to " + this.peer + " due to " + ioe, ioe);
            }
        }
        catch (Exception e) {
            this.error();
            throw e;
        }
    }

    @Override
    public void cancel(String explanation) throws IOException {
        if (this.state == Transaction.TransactionState.TRANSACTION_CANCELED || this.state == Transaction.TransactionState.TRANSACTION_COMPLETED || this.state == Transaction.TransactionState.ERROR) {
            throw new IllegalStateException("Cannot cancel transaction because state is already " + (Object)((Object)this.state));
        }
        try {
            ResponseCode.CANCEL_TRANSACTION.writeResponse(this.dos, explanation == null ? "<No explanation given>" : explanation);
            this.state = Transaction.TransactionState.TRANSACTION_CANCELED;
        }
        catch (IOException ioe) {
            this.error();
            throw new IOException("Failed to send 'cancel transaction' message to " + this.peer + " due to " + ioe, ioe);
        }
    }

    @Override
    public TransactionCompletion complete() throws IOException {
        try {
            try {
                if (this.state != Transaction.TransactionState.TRANSACTION_CONFIRMED) {
                    throw new IllegalStateException("Cannot complete transaction with " + this.peer + " because state is " + (Object)((Object)this.state) + "; Transaction can only be completed when state is " + (Object)((Object)Transaction.TransactionState.TRANSACTION_CONFIRMED));
                }
                boolean backoff = false;
                if (this.direction == TransferDirection.RECEIVE) {
                    if (this.transfers == 0) {
                        this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                        return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - this.creationNanoTime);
                    }
                    logger.debug("{} Sending TRANSACTION_FINISHED to {}", (Object)this, (Object)this.peer);
                    ResponseCode.TRANSACTION_FINISHED.writeResponse(this.dos);
                    this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                } else {
                    Response transactionResponse;
                    try {
                        transactionResponse = Response.read(this.dis);
                    }
                    catch (IOException e) {
                        throw new IOException(this + " Failed to receive a response from " + this.peer + " when expecting a TransactionFinished Indicator. " + "It is unknown whether or not the peer successfully received/processed the data.", e);
                    }
                    logger.debug("{} Received {} from {}", new Object[]{this, transactionResponse, this.peer});
                    if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
                        this.peer.penalize(this.destinationId, this.penaltyMillis);
                        backoff = true;
                    } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) {
                        throw new ProtocolException("After sending data to " + this.peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse);
                    }
                    this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                }
                return new SocketClientTransactionCompletion(backoff, this.transfers, this.contentBytes, System.nanoTime() - this.creationNanoTime);
            }
            catch (IOException ioe) {
                throw new IOException("Failed to complete transaction with " + this.peer + " due to " + ioe, ioe);
            }
        }
        catch (Exception e) {
            this.error();
            throw e;
        }
    }

    @Override
    public void confirm() throws IOException {
        block18: {
            try {
                try {
                    if (this.state == Transaction.TransactionState.TRANSACTION_STARTED && !this.dataAvailable && this.direction == TransferDirection.RECEIVE) {
                        this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                        return;
                    }
                    if (this.state != Transaction.TransactionState.DATA_EXCHANGED) {
                        throw new IllegalStateException("Cannot confirm Transaction because state is " + (Object)((Object)this.state) + "; Transaction can only be confirmed when state is " + (Object)((Object)Transaction.TransactionState.DATA_EXCHANGED));
                    }
                    if (this.direction == TransferDirection.RECEIVE) {
                        Response confirmTransactionResponse;
                        if (this.dataAvailable) {
                            throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
                        }
                        logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", (Object)this, (Object)this.peer);
                        String calculatedCRC = String.valueOf(this.crc.getValue());
                        ResponseCode.CONFIRM_TRANSACTION.writeResponse(this.dos, calculatedCRC);
                        try {
                            confirmTransactionResponse = Response.read(this.dis);
                        }
                        catch (IOException ioe) {
                            logger.error("Failed to receive response code from {} when expecting confirmation of transaction", (Object)this.peer);
                            if (this.eventReporter != null) {
                                this.eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + this.peer + " when expecting confirmation of transaction");
                            }
                            throw ioe;
                        }
                        logger.trace("{} Received {} from {}", new Object[]{this, confirmTransactionResponse, this.peer});
                        switch (confirmTransactionResponse.getCode()) {
                            case CONFIRM_TRANSACTION: {
                                break;
                            }
                            case BAD_CHECKSUM: {
                                throw new IOException(this + " Received a BadChecksum response from peer " + this.peer);
                            }
                            default: {
                                throw new ProtocolException(this + " Received unexpected Response from peer " + this.peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
                            }
                        }
                        this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                        break block18;
                    }
                    logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", (Object)this, (Object)this.peer);
                    ResponseCode.FINISH_TRANSACTION.writeResponse(this.dos);
                    String calculatedCRC = String.valueOf(this.crc.getValue());
                    Response transactionConfirmationResponse = Response.read(this.dis);
                    if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) {
                        logger.trace("{} Received {} from {}", new Object[]{this, transactionConfirmationResponse, this.peer});
                        String receivedCRC = transactionConfirmationResponse.getMessage();
                        if (this.protocolVersion > 3 && !receivedCRC.equals(calculatedCRC)) {
                            ResponseCode.BAD_CHECKSUM.writeResponse(this.dos);
                            throw new IOException(this + " Sent data to peer " + this.peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
                        }
                    } else {
                        throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + this.peer + " but received " + transactionConfirmationResponse);
                    }
                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(this.dos, "");
                    this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                }
                catch (IOException ioe) {
                    throw new IOException("Failed to confirm transaction with " + this.peer + " due to " + ioe, ioe);
                }
            }
            catch (Exception e) {
                this.error();
                throw e;
            }
        }
    }

    @Override
    public void error() {
        this.state = Transaction.TransactionState.ERROR;
    }

    @Override
    public Transaction.TransactionState getState() {
        return this.state;
    }

    @Override
    public Communicant getCommunicant() {
        return this.peer;
    }

    public String toString() {
        return "SocketClientTransaction[Url=" + this.peer.getUrl() + ", TransferDirection=" + (Object)((Object)this.direction) + ", State=" + (Object)((Object)this.state) + "]";
    }
}

