/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.shaded.org.apache.kafka.tools;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.errors.WakeupException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.utils.Exit;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionalMessageCopier {
    private static final Logger log = LoggerFactory.getLogger(TransactionalMessageCopier.class);
    private static final DateFormat FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");

    private static ArgumentParser argParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser("transactional-message-copier").defaultHelp(true).description("This tool copies messages transactionally from an input partition to an output topic, committing the consumed offsets along with the output messages");
        parser.addArgument("--input-topic").action(Arguments.store()).required(true).type(String.class).metavar("INPUT-TOPIC").dest("inputTopic").help("Consume messages from this topic");
        parser.addArgument("--input-partition").action(Arguments.store()).required(true).type(Integer.class).metavar("INPUT-PARTITION").dest("inputPartition").help("Consume messages from this partition of the input topic.");
        parser.addArgument("--output-topic").action(Arguments.store()).required(true).type(String.class).metavar("OUTPUT-TOPIC").dest("outputTopic").help("Produce messages to this topic");
        parser.addArgument("--broker-list").action(Arguments.store()).required(true).type(String.class).metavar("HOST1:PORT1[,HOST2:PORT2[...]]").dest("brokerList").help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        parser.addArgument("--max-messages").action(Arguments.store()).required(false).setDefault((Object)-1).type(Integer.class).metavar("MAX-MESSAGES").dest("maxMessages").help("Process these many messages upto the end offset at the time this program was launched. If set to -1 we will just read to the end offset of the input partition (as of the time the program was launched).");
        parser.addArgument("--consumer-group").action(Arguments.store()).required(false).setDefault((Object)-1).type(String.class).metavar("CONSUMER-GROUP").dest("consumerGroup").help("The consumer group id to use for storing the consumer offsets.");
        parser.addArgument("--transaction-size").action(Arguments.store()).required(false).setDefault((Object)200).type(Integer.class).metavar("TRANSACTION-SIZE").dest("messagesPerTransaction").help("The number of messages to put in each transaction. Default is 200.");
        parser.addArgument("--transaction-timeout").action(Arguments.store()).required(false).setDefault((Object)60000).type(Integer.class).metavar("TRANSACTION-TIMEOUT").dest("transactionTimeout").help("The transaction timeout in milliseconds. Default is 60000(1 minute).");
        parser.addArgument("--transactional-id").action(Arguments.store()).required(true).type(String.class).metavar("TRANSACTIONAL-ID").dest("transactionalId").help("The transactionalId to assign to the producer");
        parser.addArgument("--enable-random-aborts").action(Arguments.storeTrue()).type(Boolean.class).metavar("ENABLE-RANDOM-ABORTS").dest("enableRandomAborts").help("Whether or not to enable random transaction aborts (for system testing)");
        parser.addArgument("--group-mode").action(Arguments.storeTrue()).type(Boolean.class).metavar("GROUP-MODE").dest("groupMode").help("Whether to let consumer subscribe to the input topic or do manual assign. If we do subscription based consumption, the input partition shall be ignored");
        parser.addArgument("--use-group-metadata").action(Arguments.storeTrue()).type(Boolean.class).metavar("USE-GROUP-METADATA").dest("useGroupMetadata").help("Whether to use the new transactional commit API with group metadata");
        return parser;
    }

    private static KafkaProducer<String, String> createProducer(Namespace parsedArgs) {
        Properties props = new Properties();
        props.put("bootstrap.servers", parsedArgs.getString("brokerList"));
        props.put("transactional.id", parsedArgs.getString("transactionalId"));
        props.put("key.serializer", "org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringSerializer");
        props.put("batch.size", "512");
        props.put("max.in.flight.requests.per.connection", "5");
        props.put("transaction.timeout.ms", parsedArgs.getInt("transactionTimeout"));
        return new KafkaProducer<String, String>(props);
    }

    private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs) {
        String consumerGroup = parsedArgs.getString("consumerGroup");
        String brokerList = parsedArgs.getString("brokerList");
        Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction");
        Properties props = new Properties();
        props.put("group.id", consumerGroup);
        props.put("bootstrap.servers", brokerList);
        props.put("isolation.level", "read_committed");
        props.put("max.poll.records", numMessagesPerTransaction.toString());
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.interval.ms", "180000");
        props.put("heartbeat.interval.ms", "3000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<String, String>(props);
    }

    private static ProducerRecord<String, String> producerRecordFromConsumerRecord(String topic, ConsumerRecord<String, String> record) {
        return new ProducerRecord<String, String>(topic, record.partition(), record.key(), record.value());
    }

    private static Map<TopicPartition, OffsetAndMetadata> consumerPositions(KafkaConsumer<String, String> consumer) {
        HashMap<TopicPartition, OffsetAndMetadata> positions = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition topicPartition : consumer.assignment()) {
            positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
        }
        return positions;
    }

    private static void resetToLastCommittedPositions(KafkaConsumer<String, String> consumer) {
        Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
        consumer.assignment().forEach(tp -> {
            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)committed.get(tp);
            if (offsetAndMetadata != null) {
                consumer.seek((TopicPartition)tp, offsetAndMetadata.offset());
            } else {
                consumer.seekToBeginning(Collections.singleton(tp));
            }
        });
    }

    private static long messagesRemaining(KafkaConsumer<String, String> consumer, TopicPartition partition) {
        long currentPosition = consumer.position(partition);
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(partition));
        if (endOffsets.containsKey(partition)) {
            return endOffsets.get(partition) - currentPosition;
        }
        return 0L;
    }

    private static String toJsonString(Map<String, Object> data) {
        String json;
        try {
            ObjectMapper mapper = new ObjectMapper();
            json = mapper.writeValueAsString(data);
        }
        catch (JsonProcessingException e) {
            json = "Bad data can't be written as json: " + e.getMessage();
        }
        return json;
    }

    private static synchronized String statusAsJson(long totalProcessed, long consumedSinceLastRebalanced, long remaining, String transactionalId, String stage) {
        HashMap<String, Object> statusData = new HashMap<String, Object>();
        statusData.put("progress", transactionalId);
        statusData.put("totalProcessed", totalProcessed);
        statusData.put("consumed", consumedSinceLastRebalanced);
        statusData.put("remaining", remaining);
        statusData.put("time", FORMAT.format(new Date()));
        statusData.put("stage", stage);
        return TransactionalMessageCopier.toJsonString(statusData);
    }

    private static synchronized String shutDownString(long totalProcessed, long consumedSinceLastRebalanced, long remaining, String transactionalId) {
        HashMap<String, Object> shutdownData = new HashMap<String, Object>();
        shutdownData.put("shutdown_complete", transactionalId);
        shutdownData.put("totalProcessed", totalProcessed);
        shutdownData.put("consumed", consumedSinceLastRebalanced);
        shutdownData.put("remaining", remaining);
        shutdownData.put("time", FORMAT.format(new Date()));
        return TransactionalMessageCopier.toJsonString(shutdownData);
    }

    private static void abortTransactionAndResetPosition(KafkaProducer<String, String> producer, KafkaConsumer<String, String> consumer) {
        producer.abortTransaction();
        TransactionalMessageCopier.resetToLastCommittedPositions(consumer);
    }

    public static void main(String[] args) {
        Namespace parsedArgs = TransactionalMessageCopier.argParser().parseArgsOrFail(args);
        try {
            TransactionalMessageCopier.runEventLoop(parsedArgs);
            Exit.exit(0);
        }
        catch (Exception e) {
            log.error("Shutting down after unexpected error in event loop", e);
            System.err.println("Shutting down after unexpected error " + e.getClass().getSimpleName() + ": " + e.getMessage() + " (see the log for additional detail)");
            Exit.exit(1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void runEventLoop(Namespace parsedArgs) {
        final String transactionalId = parsedArgs.getString("transactionalId");
        String outputTopic = parsedArgs.getString("outputTopic");
        String consumerGroup = parsedArgs.getString("consumerGroup");
        KafkaProducer<String, String> producer = TransactionalMessageCopier.createProducer(parsedArgs);
        final KafkaConsumer<String, String> consumer = TransactionalMessageCopier.createConsumer(parsedArgs);
        final AtomicLong remainingMessages = new AtomicLong(parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : (long)parsedArgs.getInt("maxMessages").intValue());
        boolean groupMode = parsedArgs.getBoolean("groupMode");
        String topicName = parsedArgs.getString("inputTopic");
        final AtomicLong numMessagesProcessedSinceLastRebalance = new AtomicLong(0L);
        final AtomicLong totalMessageProcessed = new AtomicLong(0L);
        if (groupMode) {
            consumer.subscribe(Collections.singleton(topicName), new ConsumerRebalanceListener(){

                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    remainingMessages.set(partitions.stream().mapToLong(partition -> TransactionalMessageCopier.messagesRemaining(consumer, partition)).sum());
                    numMessagesProcessedSinceLastRebalance.set(0L);
                    System.out.println(TransactionalMessageCopier.statusAsJson(totalMessageProcessed.get(), numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, "RebalanceComplete"));
                }
            });
        } else {
            TopicPartition inputPartition = new TopicPartition(topicName, parsedArgs.getInt("inputPartition"));
            consumer.assign(Collections.singleton(inputPartition));
            remainingMessages.set(Math.min(TransactionalMessageCopier.messagesRemaining(consumer, inputPartition), remainingMessages.get()));
        }
        boolean enableRandomAborts = parsedArgs.getBoolean("enableRandomAborts");
        producer.initTransactions();
        AtomicBoolean isShuttingDown = new AtomicBoolean(false);
        Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () -> {
            isShuttingDown.set(true);
            consumer.wakeup();
            System.out.println(TransactionalMessageCopier.shutDownString(totalMessageProcessed.get(), numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId));
        });
        boolean useGroupMetadata = parsedArgs.getBoolean("useGroupMetadata");
        try {
            Random random = new Random();
            while (!isShuttingDown.get() && remainingMessages.get() > 0L) {
                System.out.println(TransactionalMessageCopier.statusAsJson(totalMessageProcessed.get(), numMessagesProcessedSinceLastRebalance.get(), remainingMessages.get(), transactionalId, "ProcessLoop"));
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200L));
                if (records.count() <= 0) continue;
                try {
                    producer.beginTransaction();
                    for (ConsumerRecord<String, String> consumerRecord : records) {
                        producer.send(TransactionalMessageCopier.producerRecordFromConsumerRecord(outputTopic, consumerRecord));
                    }
                    long messagesSentWithinCurrentTxn = records.count();
                    ConsumerGroupMetadata groupMetadata = useGroupMetadata ? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup);
                    producer.sendOffsetsToTransaction(TransactionalMessageCopier.consumerPositions(consumer), groupMetadata);
                    if (enableRandomAborts && random.nextInt() % 3 == 0) {
                        TransactionalMessageCopier.abortTransactionAndResetPosition(producer, consumer);
                        continue;
                    }
                    producer.commitTransaction();
                    remainingMessages.getAndAdd(-messagesSentWithinCurrentTxn);
                    numMessagesProcessedSinceLastRebalance.getAndAdd(messagesSentWithinCurrentTxn);
                    totalMessageProcessed.getAndAdd(messagesSentWithinCurrentTxn);
                }
                catch (ProducerFencedException e) {
                    throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId), e);
                }
                catch (KafkaException e) {
                    log.debug("Aborting transaction after catching exception", e);
                    TransactionalMessageCopier.abortTransactionAndResetPosition(producer, consumer);
                }
            }
        }
        catch (WakeupException e) {
            if (!isShuttingDown.get()) {
                throw e;
            }
        }
        finally {
            Utils.closeQuietly(producer, "producer");
            Utils.closeQuietly(consumer, "consumer");
        }
    }
}

