/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.websocket;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketService;

@Tags(value={"WebSocket", "publish", "send"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@TriggerSerially
@CapabilityDescription(value="Sends messages to a WebSocket remote endpoint using a WebSocket session that is established by either ListenWebSocket or ConnectWebSocket.")
@WritesAttributes(value={@WritesAttribute(attribute="websocket.controller.service.id", description="WebSocket Controller Service id."), @WritesAttribute(attribute="websocket.session.id", description="Established WebSocket session id."), @WritesAttribute(attribute="websocket.endpoint.id", description="WebSocket endpoint id."), @WritesAttribute(attribute="websocket.message.type", description="TEXT or BINARY."), @WritesAttribute(attribute="websocket.local.address", description="WebSocket server address."), @WritesAttribute(attribute="websocket.remote.address", description="WebSocket client address."), @WritesAttribute(attribute="websocket.failure.detail", description="Detail of the failure.")})
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class PutWebSocket
extends AbstractProcessor {
    public static final PropertyDescriptor PROP_WS_SESSION_ID = new PropertyDescriptor.Builder().name("websocket-session-id").displayName("WebSocket Session Id").description("A NiFi Expression to retrieve the session id. If not specified, a message will be sent to all connected WebSocket peers for the WebSocket controller service endpoint.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${websocket.session.id}").build();
    public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ID = new PropertyDescriptor.Builder().name("websocket-controller-service-id").displayName("WebSocket ControllerService Id").description("A NiFi Expression to retrieve the id of a WebSocket ControllerService.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${websocket.controller.service.id}").build();
    public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ENDPOINT = new PropertyDescriptor.Builder().name("websocket-endpoint-id").displayName("WebSocket Endpoint Id").description("A NiFi Expression to retrieve the endpoint id of a WebSocket ControllerService.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${websocket.endpoint.id}").build();
    public static final PropertyDescriptor PROP_WS_MESSAGE_TYPE = new PropertyDescriptor.Builder().name("websocket-message-type").displayName("WebSocket Message Type").description("The type of message content: TEXT or BINARY").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).defaultValue(WebSocketMessage.Type.TEXT.toString()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are sent successfully to the destination are transferred to this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to send to the destination are transferred to this relationship.").build();
    private static final List<PropertyDescriptor> descriptors;
    private static final Set<Relationship> relationships;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
        FlowFile flowfile = processSession.get();
        if (flowfile == null) {
            return;
        }
        String sessionId = context.getProperty(PROP_WS_SESSION_ID).evaluateAttributeExpressions(flowfile).getValue();
        String webSocketServiceId = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ID).evaluateAttributeExpressions(flowfile).getValue();
        String webSocketServiceEndpoint = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ENDPOINT).evaluateAttributeExpressions(flowfile).getValue();
        String messageTypeStr = context.getProperty(PROP_WS_MESSAGE_TYPE).evaluateAttributeExpressions(flowfile).getValue();
        WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf((String)messageTypeStr);
        if (StringUtils.isEmpty((String)sessionId)) {
            this.getLogger().debug("Specific SessionID not specified. Message will be broadcast to all connected clients.");
        }
        if (StringUtils.isEmpty((String)webSocketServiceId) || StringUtils.isEmpty((String)webSocketServiceEndpoint)) {
            this.transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found.");
            return;
        }
        ControllerService controllerService = context.getControllerServiceLookup().getControllerService(webSocketServiceId);
        if (controllerService == null) {
            this.transferToFailure(processSession, flowfile, "WebSocket ControllerService was not found.");
            return;
        }
        if (!(controllerService instanceof WebSocketService)) {
            this.transferToFailure(processSession, flowfile, "The ControllerService found was not a WebSocket ControllerService but a " + controllerService.getClass().getName());
            return;
        }
        WebSocketService webSocketService = (WebSocketService)controllerService;
        byte[] messageContent = new byte[(int)flowfile.getSize()];
        long startSending = System.currentTimeMillis();
        AtomicReference transitUri = new AtomicReference();
        HashMap<String, String> attrs = new HashMap<String, String>();
        attrs.put("websocket.controller.service.id", webSocketService.getIdentifier());
        if (!StringUtils.isEmpty((String)sessionId)) {
            attrs.put("websocket.session.id", sessionId);
        }
        attrs.put("websocket.endpoint.id", webSocketServiceEndpoint);
        attrs.put("websocket.message.type", messageTypeStr);
        processSession.read(flowfile, in -> StreamUtils.fillBuffer((InputStream)in, (byte[])messageContent, (boolean)true));
        try {
            webSocketService.sendMessage(webSocketServiceEndpoint, sessionId, sender -> {
                switch (messageType) {
                    case TEXT: {
                        sender.sendString(new String(messageContent, "UTF-8"));
                        break;
                    }
                    case BINARY: {
                        sender.sendBinary(ByteBuffer.wrap(messageContent));
                    }
                }
                attrs.put("websocket.local.address", sender.getLocalAddress().toString());
                attrs.put("websocket.remote.address", sender.getRemoteAddress().toString());
                transitUri.set(sender.getTransitUri());
            });
            FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs);
            long transmissionMillis = System.currentTimeMillis() - startSending;
            processSession.getProvenanceReporter().send(updatedFlowFile, (String)transitUri.get(), transmissionMillis);
            processSession.transfer(updatedFlowFile, REL_SUCCESS);
        }
        catch (IOException | IllegalStateException | WebSocketConfigurationException e) {
            this.getLogger().error("Failed to send message via WebSocket", e);
            this.transferToFailure(processSession, flowfile, e.toString());
        }
    }

    private FlowFile transferToFailure(ProcessSession processSession, FlowFile flowfile, String value) {
        flowfile = processSession.putAttribute(flowfile, "websocket.failure.detail", value);
        processSession.transfer(flowfile, REL_FAILURE);
        return flowfile;
    }

    static {
        ArrayList<PropertyDescriptor> innerDescriptorsList = new ArrayList<PropertyDescriptor>();
        innerDescriptorsList.add(PROP_WS_SESSION_ID);
        innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ID);
        innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ENDPOINT);
        innerDescriptorsList.add(PROP_WS_MESSAGE_TYPE);
        descriptors = Collections.unmodifiableList(innerDescriptorsList);
        HashSet<Relationship> innerRelationshipsSet = new HashSet<Relationship>();
        innerRelationshipsSet.add(REL_SUCCESS);
        innerRelationshipsSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(innerRelationshipsSet);
    }
}

