/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.cli.AbstractCmdConsume;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

@Parameters(commandDescription="Read messages from a specified topic")
public class CmdRead
extends AbstractCmdConsume {
    private static final Pattern MSG_ID_PATTERN = Pattern.compile("^(-?[1-9][0-9]*|0):(-?[1-9][0-9]*|0)$");
    @Parameter(description="TopicName", required=true)
    private List<String> mainOptions = new ArrayList<String>();
    @Parameter(names={"-m", "--start-message-id"}, description="Initial reader position, it can be 'latest', 'earliest' or '<ledgerId>:<entryId>'")
    private String startMessageId = "latest";
    @Parameter(names={"-i", "--start-message-id-inclusive"}, description="Whether to include the position specified by -m option.")
    private boolean startMessageIdInclusive = false;
    @Parameter(names={"-n", "--num-messages"}, description="Number of messages to read, 0 means to read forever.")
    private int numMessagesToRead = 1;
    @Parameter(names={"--hex"}, description="Display binary messages in hex.")
    private boolean displayHex = false;
    @Parameter(names={"--hide-content"}, description="Do not write the message to console.")
    private boolean hideContent = false;
    @Parameter(names={"-r", "--rate"}, description="Rate (in msg/sec) at which to read, value 0 means to read messages as fast as possible.")
    private double readRate = 0.0;
    @Parameter(names={"-q", "--queue-size"}, description="Reader receiver queue size.")
    private int receiverQueueSize = 0;
    @Parameter(names={"-mc", "--max_chunked_msg"}, description="Max pending chunk messages")
    private int maxPendingChunkedMessage = 0;
    @Parameter(names={"-ac", "--auto_ack_chunk_q_full"}, description="Auto ack for oldest message on queue is full")
    private boolean autoAckOldestChunkedMessageOnQueueFull = false;
    @Parameter(names={"-ekv", "--encryption-key-value"}, description="The URI of private key to decrypt payload, for example file:///path/to/private.key or data:application/x-pem-file;base64,*****")
    private String encKeyValue;
    @Parameter(names={"-st", "--schema-type"}, description="Set a schema type on the reader, it can be 'bytes' or 'auto_consume'")
    private String schemaType = "bytes";
    @Parameter(names={"-pm", "--pool-messages"}, description="Use the pooled message", arity=1)
    private boolean poolMessages = true;

    public int run() throws PulsarClientException, IOException {
        if (this.mainOptions.size() != 1) {
            throw new ParameterException("Please provide one and only one topic name.");
        }
        if (this.numMessagesToRead < 0) {
            throw new ParameterException("Number of messages should be zero or positive.");
        }
        String topic = this.mainOptions.get(0);
        if (this.serviceURL.startsWith("ws")) {
            return this.readFromWebSocket(topic);
        }
        return this.read(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int read(String topic) {
        int numMessagesRead = 0;
        int returnCode = 0;
        try (PulsarClient client = this.clientBuilder.build();){
            Schema schema;
            Schema schema2 = schema = this.poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
            if ("auto_consume".equals(this.schemaType)) {
                schema = Schema.AUTO_CONSUME();
            } else if (!"bytes".equals(this.schemaType)) {
                throw new IllegalArgumentException("schema type must be 'bytes' or 'auto_consume'");
            }
            ReaderBuilder builder = client.newReader(schema).topic(topic).startMessageId(CmdRead.parseMessageId(this.startMessageId)).poolMessages(this.poolMessages);
            if (this.startMessageIdInclusive) {
                builder.startMessageIdInclusive();
            }
            if (this.maxPendingChunkedMessage > 0) {
                builder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
            }
            if (this.receiverQueueSize > 0) {
                builder.receiverQueueSize(this.receiverQueueSize);
            }
            builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
            if (StringUtils.isNotBlank((CharSequence)this.encKeyValue)) {
                builder.defaultCryptoKeyReader(this.encKeyValue);
            }
            try (Reader reader = builder.create();){
                RateLimiter limiter;
                RateLimiter rateLimiter = limiter = this.readRate > 0.0 ? RateLimiter.create((double)this.readRate) : null;
                while (this.numMessagesToRead == 0 || numMessagesRead < this.numMessagesToRead) {
                    Message msg;
                    if (limiter != null) {
                        limiter.acquire();
                    }
                    if ((msg = reader.readNext(5, TimeUnit.SECONDS)) == null) {
                        LOG.debug("No message to read after waiting for 5 seconds.");
                        continue;
                    }
                    try {
                        ++numMessagesRead;
                        if (!this.hideContent) {
                            System.out.println("----- got message -----");
                            String output = this.interpretMessage(msg, this.displayHex);
                            System.out.println(output);
                            continue;
                        }
                        if (numMessagesRead % 1000 != 0) continue;
                        System.out.println("Received " + numMessagesRead + " messages");
                    }
                    finally {
                        msg.release();
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.error("Error while reading messages");
            LOG.error(e.getMessage(), (Throwable)e);
            returnCode = -1;
        }
        finally {
            LOG.info("{} messages successfully read", (Object)numMessagesRead);
        }
        return returnCode;
    }

    @VisibleForTesting
    public String getWebSocketReadUri(String topic) {
        String msgIdQueryParam;
        String serviceURLWithoutTrailingSlash = this.serviceURL.substring(0, this.serviceURL.endsWith("/") ? this.serviceURL.length() - 1 : this.serviceURL.length());
        TopicName topicName = TopicName.get((String)topic);
        String wsTopic = topicName.isV2() ? String.format("%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName()) : String.format("%s/%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getCluster(), topicName.getNamespacePortion(), topicName.getLocalName());
        if ("latest".equals(this.startMessageId) || "earliest".equals(this.startMessageId)) {
            msgIdQueryParam = this.startMessageId;
        } else {
            MessageId msgId = CmdRead.parseMessageId(this.startMessageId);
            msgIdQueryParam = Base64.getEncoder().encodeToString(msgId.toByteArray());
        }
        String uriFormat = "%s/ws" + (topicName.isV2() ? "/v2/" : "/") + "reader/%s?messageId=%s";
        return String.format(uriFormat, serviceURLWithoutTrailingSlash, wsTopic, msgIdQueryParam);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int readFromWebSocket(String topic) {
        int numMessagesRead = 0;
        int returnCode = 0;
        URI readerUri = URI.create(this.getWebSocketReadUri(topic));
        WebSocketClient readClient = new WebSocketClient(new SslContextFactory(true));
        ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
        try {
            if (this.authentication != null) {
                this.authentication.start();
                AuthenticationDataProvider authData = this.authentication.getAuthData(readerUri.getHost());
                if (authData.hasDataForHttp()) {
                    for (Map.Entry kv : authData.getHttpHeaders()) {
                        readRequest.setHeader((String)kv.getKey(), (String)kv.getValue());
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.error("Authentication plugin error: " + e.getMessage());
            return -1;
        }
        CompletableFuture<Void> connected = new CompletableFuture<Void>();
        AbstractCmdConsume.ConsumerSocket readerSocket = new AbstractCmdConsume.ConsumerSocket(connected);
        try {
            readClient.start();
        }
        catch (Exception e) {
            LOG.error("Failed to start websocket-client", (Throwable)e);
            return -1;
        }
        try {
            LOG.info("Trying to create websocket session..{}", (Object)readerUri);
            readClient.connect((Object)readerSocket, readerUri, readRequest);
            connected.get();
        }
        catch (Exception e) {
            LOG.error("Failed to create web-socket session", (Throwable)e);
            return -1;
        }
        try {
            RateLimiter limiter;
            RateLimiter rateLimiter = limiter = this.readRate > 0.0 ? RateLimiter.create((double)this.readRate) : null;
            while (this.numMessagesToRead == 0 || numMessagesRead < this.numMessagesToRead) {
                String msg;
                if (limiter != null) {
                    limiter.acquire();
                }
                if ((msg = readerSocket.receive(5L, TimeUnit.SECONDS)) == null) {
                    LOG.debug("No message to read after waiting for 5 seconds.");
                    continue;
                }
                try {
                    String output = CmdRead.interpretByteArray(this.displayHex, Base64.getDecoder().decode(msg));
                    System.out.println(output);
                }
                catch (Exception e) {
                    System.out.println(msg);
                }
                ++numMessagesRead;
            }
            readerSocket.awaitClose(2, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.error("Error while reading messages");
            LOG.error(e.getMessage(), (Throwable)e);
            returnCode = -1;
        }
        finally {
            LOG.info("{} messages successfully read", (Object)numMessagesRead);
        }
        return returnCode;
    }

    @VisibleForTesting
    static MessageId parseMessageId(String msgIdStr) {
        MessageId msgId;
        if ("latest".equals(msgIdStr)) {
            msgId = MessageId.latest;
        } else if ("earliest".equals(msgIdStr)) {
            msgId = MessageId.earliest;
        } else {
            Matcher matcher = MSG_ID_PATTERN.matcher(msgIdStr);
            if (matcher.find()) {
                msgId = new MessageIdImpl(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2)), -1);
            } else {
                throw new IllegalArgumentException("Message ID must be 'latest', 'earliest' or '<ledgerId>:<entryId>'");
            }
        }
        return msgId;
    }
}

