/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardRemoteGroupPort
extends RemoteGroupPort {
    private static final long BATCH_SEND_NANOS = TimeUnit.MILLISECONDS.toNanos(500L);
    private static final String CATEGORY = "Site to Site";
    private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class);
    private final RemoteProcessGroup remoteGroup;
    private final AtomicBoolean useCompression = new AtomicBoolean(false);
    private final AtomicReference<Integer> batchCount = new AtomicReference();
    private final AtomicReference<String> batchSize = new AtomicReference();
    private final AtomicReference<String> batchDuration = new AtomicReference();
    private final AtomicBoolean targetExists = new AtomicBoolean(true);
    private final AtomicBoolean targetRunning = new AtomicBoolean(true);
    private final SSLContext sslContext;
    private final TransferDirection transferDirection;
    private volatile String targetId;
    private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference();

    SiteToSiteClient getSiteToSiteClient() {
        return this.clientRef.get();
    }

    public StandardRemoteGroupPort(String id, String targetId, String name, RemoteProcessGroup remoteGroup, TransferDirection direction, ConnectableType type, SSLContext sslContext, ProcessScheduler scheduler) {
        super(id, name, type, scheduler);
        this.targetId = targetId;
        this.remoteGroup = remoteGroup;
        this.transferDirection = direction;
        this.sslContext = sslContext;
        this.setSchedulingPeriod("1 nanos");
    }

    public String getTargetIdentifier() {
        String target = this.targetId;
        return target == null ? this.getIdentifier() : target;
    }

    public void setTargetIdentifier(String targetId) {
        this.targetId = targetId;
    }

    public boolean isTargetRunning() {
        return this.targetRunning.get();
    }

    public void setTargetRunning(boolean targetRunning) {
        this.targetRunning.set(targetRunning);
    }

    public boolean isTriggerWhenEmpty() {
        return this.getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT;
    }

    public Resource getResource() {
        ResourceType resourceType = ConnectableType.REMOTE_INPUT_PORT.equals((Object)this.getConnectableType()) ? ResourceType.InputPort : ResourceType.OutputPort;
        return ResourceFactory.getComponentResource((ResourceType)resourceType, (String)this.getIdentifier(), (String)this.getName());
    }

    public Authorizable getParentAuthorizable() {
        return this.getRemoteProcessGroup();
    }

    public void shutdown() {
        super.shutdown();
        SiteToSiteClient client = this.getSiteToSiteClient();
        if (client != null) {
            try {
                client.close();
            }
            catch (IOException ioe) {
                logger.warn("Failed to properly shutdown Site-to-Site Client", (Throwable)ioe);
            }
        }
    }

    public void onSchedulingStart() {
        String batchDuration;
        String batchSize;
        super.onSchedulingStart();
        long penalizationMillis = FormatUtils.getTimeDuration((String)this.remoteGroup.getYieldDuration(), (TimeUnit)TimeUnit.MILLISECONDS);
        SiteToSiteClient.Builder clientBuilder = new SiteToSiteClient.Builder().urls(SiteToSiteRestApiClient.parseClusterUrls((String)this.remoteGroup.getTargetUris())).portIdentifier(this.getTargetIdentifier()).sslContext(this.sslContext).useCompression(this.isUseCompression()).eventReporter(this.remoteGroup.getEventReporter()).stateManager(this.remoteGroup.getStateManager()).nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS).timeout((long)this.remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS).transportProtocol(this.remoteGroup.getTransportProtocol()).httpProxy(new HttpProxy(this.remoteGroup.getProxyHost(), this.remoteGroup.getProxyPort(), this.remoteGroup.getProxyUser(), this.remoteGroup.getProxyPassword())).localAddress(this.remoteGroup.getLocalAddress());
        Integer batchCount = this.getBatchCount();
        if (batchCount != null) {
            clientBuilder.requestBatchCount(batchCount.intValue());
        }
        if ((batchSize = this.getBatchSize()) != null && batchSize.length() > 0) {
            clientBuilder.requestBatchSize((long)DataUnit.parseDataSize((String)batchSize.trim(), (DataUnit)DataUnit.B).intValue());
        }
        if ((batchDuration = this.getBatchDuration()) != null && batchDuration.length() > 0) {
            clientBuilder.requestBatchDuration(FormatUtils.getTimeDuration((String)batchDuration.trim(), (TimeUnit)TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        }
        this.clientRef.set(clientBuilder.build());
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        Transaction transaction;
        FlowFile firstFlowFile;
        if (!this.remoteGroup.isTransmitting()) {
            logger.debug("{} {} is not transmitting; will not send/receive", (Object)this, (Object)this.remoteGroup);
            return;
        }
        if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0) {
            logger.debug("{} No data to send", (Object)this);
            return;
        }
        String url = this.getRemoteProcessGroup().getTargetUri();
        if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
            firstFlowFile = session.get();
            if (firstFlowFile == null) {
                return;
            }
        } else {
            firstFlowFile = null;
        }
        SiteToSiteClient client = this.getSiteToSiteClient();
        try {
            transaction = client.createTransaction(this.transferDirection);
        }
        catch (PortNotRunningException e) {
            context.yield();
            this.targetRunning.set(false);
            String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", new Object[]{this, url});
            logger.error(message);
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            throw new ProcessException((Throwable)e);
        }
        catch (UnknownPortException e) {
            context.yield();
            this.targetExists.set(false);
            String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", new Object[]{this, url});
            logger.error(message);
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            throw new ProcessException((Throwable)e);
        }
        catch (UnreachableClusterException e) {
            context.yield();
            String message = String.format("%s failed to communicate with %s due to %s", new Object[]{this, url, e.toString()});
            logger.error(message);
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            throw new ProcessException((Throwable)e);
        }
        catch (IOException e) {
            String message = String.format("%s failed to communicate with %s due to %s", new Object[]{this, url, e.toString()});
            logger.error(message);
            if (logger.isDebugEnabled()) {
                logger.error("", (Throwable)e);
            }
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            throw new ProcessException((Throwable)e);
        }
        if (transaction == null) {
            logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", (Object)this);
            session.rollback();
            context.yield();
            return;
        }
        try {
            if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
                this.transferFlowFiles(transaction, context, session, firstFlowFile);
            } else {
                int numReceived = this.receiveFlowFiles(transaction, context, session);
                if (numReceived == 0) {
                    context.yield();
                }
            }
            session.commitAsync();
        }
        catch (Throwable t) {
            String message = String.format("%s failed to communicate with remote NiFi instance due to %s", new Object[]{this, t.toString()});
            if (t instanceof TransmissionDisabledException) {
                logger.debug(message, t);
            } else {
                logger.error("{} failed to communicate with remote NiFi instance", (Object)this, (Object)t);
                this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            }
            transaction.error();
            throw new ProcessException(t);
        }
    }

    public String getYieldPeriod() {
        return this.remoteGroup.getYieldDuration();
    }

    private int transferFlowFiles(Transaction transaction, ProcessContext context, ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException {
        Object flowFile = firstFlowFile;
        try {
            String userDn = transaction.getCommunicant().getDistinguishedName();
            long startSendingNanos = System.nanoTime();
            StopWatch stopWatch = new StopWatch(true);
            long bytesSent = 0L;
            SiteToSiteClientConfig siteToSiteClientConfig = this.getSiteToSiteClient().getConfig();
            long maxBatchBytes = siteToSiteClientConfig.getPreferredBatchSize();
            int maxBatchCount = siteToSiteClientConfig.getPreferredBatchCount();
            long preferredBatchDuration = siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
            long maxBatchDuration = preferredBatchDuration > 0L ? preferredBatchDuration : BATCH_SEND_NANOS;
            HashSet<FlowFile> flowFilesSent = new HashSet<FlowFile>();
            boolean continueTransaction = true;
            while (continueTransaction) {
                long startNanos = System.nanoTime();
                FlowFile toWrap = flowFile;
                session.read(flowFile, in -> {
                    StandardDataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
                    transaction.send((DataPacket)dataPacket);
                });
                long transferNanos = System.nanoTime() - startNanos;
                long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
                flowFilesSent.add((FlowFile)flowFile);
                bytesSent += flowFile.getSize();
                logger.debug("{} Sent {} to {}", new Object[]{this, flowFile, transaction.getCommunicant().getUrl()});
                String transitUri = transaction.getCommunicant().createTransitUri(flowFile.getAttribute(CoreAttributes.UUID.key()));
                flowFile = session.putAttribute(flowFile, SiteToSiteAttributes.S2S_PORT_ID.key(), this.getTargetIdentifier());
                session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
                session.remove(flowFile);
                long sendingNanos = System.nanoTime() - startSendingNanos;
                flowFile = maxBatchCount > 0 && flowFilesSent.size() >= maxBatchCount ? null : (maxBatchBytes > 0L && bytesSent >= maxBatchBytes ? null : (sendingNanos >= maxBatchDuration ? null : session.get()));
                continueTransaction = flowFile != null;
            }
            transaction.confirm();
            stopWatch.stop();
            String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
            long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
            String dataSize = FormatUtils.formatDataSize((double)bytesSent);
            transaction.complete();
            session.commitAsync();
            Object flowFileDescription = flowFilesSent.size() < 20 ? ((Object)flowFilesSent).toString() : flowFilesSent.size() + " FlowFiles";
            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
            return flowFilesSent.size();
        }
        catch (Exception e) {
            session.rollback();
            throw e;
        }
    }

    private int receiveFlowFiles(Transaction transaction, ProcessContext context, ProcessSession session) throws IOException {
        String userDn = transaction.getCommunicant().getDistinguishedName();
        StopWatch stopWatch = new StopWatch(true);
        HashSet<FlowFile> flowFilesReceived = new HashSet<FlowFile>();
        long bytesReceived = 0L;
        while (true) {
            long start = System.nanoTime();
            DataPacket dataPacket = transaction.receive();
            if (dataPacket == null) break;
            FlowFile flowFile = session.create();
            flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
            Communicant communicant = transaction.getCommunicant();
            String host = StringUtils.isEmpty((String)communicant.getHost()) ? "unknown" : communicant.getHost();
            String port = communicant.getPort() < 0 ? "unknown" : String.valueOf(communicant.getPort());
            HashMap<String, Object> attributes = new HashMap<String, Object>(2);
            attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
            attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
            attributes.put(SiteToSiteAttributes.S2S_PORT_ID.key(), this.getTargetIdentifier());
            flowFile = session.putAllAttributes(flowFile, attributes);
            flowFile = session.importFrom(dataPacket.getData(), flowFile);
            long receiveNanos = System.nanoTime() - start;
            flowFilesReceived.add(flowFile);
            String sourceFlowFileIdentifier = (String)dataPacket.getAttributes().get(CoreAttributes.UUID.key());
            if (sourceFlowFileIdentifier == null) {
                sourceFlowFileIdentifier = "<Unknown Identifier>";
            }
            String transitUri = transaction.getCommunicant().createTransitUri(sourceFlowFileIdentifier);
            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
            session.transfer(flowFile, Relationship.ANONYMOUS);
            bytesReceived += dataPacket.getSize();
        }
        transaction.confirm();
        long numBytesReceived = bytesReceived;
        session.commitAsync(() -> {
            try {
                transaction.complete();
            }
            catch (Exception e) {
                logger.error("Successfully received {} FlowFiles ({}) from {} and committed session but failed to notify sender that transaction was complete. This could result in data duplication.", new Object[]{flowFilesReceived.size(), flowFilesReceived, userDn});
            }
            if (!flowFilesReceived.isEmpty()) {
                stopWatch.stop();
                Object flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
                String uploadDataRate = stopWatch.calculateDataRate(numBytesReceived);
                long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                String dataSize = FormatUtils.formatDataSize((double)numBytesReceived);
                logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
            }
        });
        return flowFilesReceived.size();
    }

    public boolean getTargetExists() {
        return this.targetExists.get();
    }

    public boolean isValid() {
        if (!this.targetExists.get()) {
            return false;
        }
        if (this.getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && this.getConnections(Relationship.ANONYMOUS).isEmpty()) {
            return false;
        }
        boolean groupValid = this.remoteGroup.validate().stream().allMatch(result -> result.isValid());
        return groupValid;
    }

    public Collection<ValidationResult> getValidationErrors() {
        ArrayList<ValidationResult> validationErrors = new ArrayList<ValidationResult>();
        if (this.getScheduledState() == ScheduledState.STOPPED) {
            ValidationResult error = null;
            if (!this.targetExists.get()) {
                error = new ValidationResult.Builder().explanation(String.format("Remote instance indicates that port '%s' no longer exists.", this.getName())).subject(String.format("Remote port '%s'", this.getName())).valid(false).build();
            } else if (this.getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && this.getConnections(Relationship.ANONYMOUS).isEmpty()) {
                error = new ValidationResult.Builder().explanation(String.format("Port '%s' has no outbound connections", this.getName())).subject(String.format("Remote port '%s'", this.getName())).valid(false).build();
            }
            if (error != null) {
                validationErrors.add(error);
            }
        }
        return validationErrors;
    }

    public void verifyCanStart() {
        super.verifyCanStart();
        if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && this.getIncomingConnections().isEmpty()) {
            throw new IllegalStateException("Port " + this.getName() + " has no incoming connections");
        }
        Optional<ValidationResult> resultOption = this.remoteGroup.validate().stream().filter(result -> !result.isValid()).findFirst();
        if (resultOption.isPresent()) {
            throw new IllegalStateException("Remote Process Group is not valid: " + resultOption.get().toString());
        }
    }

    public void setUseCompression(boolean useCompression) {
        this.useCompression.set(useCompression);
    }

    public boolean isUseCompression() {
        return this.useCompression.get();
    }

    public Integer getBatchCount() {
        return this.batchCount.get();
    }

    public void setBatchCount(Integer batchCount) {
        this.batchCount.set(batchCount);
    }

    public String getBatchSize() {
        return this.batchSize.get();
    }

    public void setBatchSize(String batchSize) {
        this.batchSize.set(batchSize);
    }

    public String getBatchDuration() {
        return this.batchDuration.get();
    }

    public void setBatchDuration(String batchDuration) {
        this.batchDuration.set(batchDuration);
    }

    public String toString() {
        return "RemoteGroupPort[name=" + this.getName() + ",targets=" + this.remoteGroup.getTargetUris() + "]";
    }

    public RemoteProcessGroup getRemoteProcessGroup() {
        return this.remoteGroup;
    }

    public TransferDirection getTransferDirection() {
        return this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ? TransferDirection.SEND : TransferDirection.RECEIVE;
    }

    public void setTargetExists(boolean exists) {
        this.targetExists.set(exists);
    }

    public void removeConnection(Connection connection) throws IllegalArgumentException, IllegalStateException {
        super.removeConnection(connection);
        if (!this.getTargetExists() && !this.hasIncomingConnection() && this.getConnections().isEmpty()) {
            this.remoteGroup.removeNonExistentPort((RemoteGroupPort)this);
        }
    }

    public SchedulingStrategy getSchedulingStrategy() {
        return SchedulingStrategy.TIMER_DRIVEN;
    }

    public boolean isSideEffectFree() {
        return false;
    }

    public String getComponentType() {
        return "RemoteGroupPort";
    }
}

