package kafka.tools;

import com.facebook.presto.hive.$internal.jodd.util.StringPool;
import com.microsoft.azure.keyvault.models.MessagePropertyNames;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.admin.AdminClient;
import kafka.admin.TopicCommand;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Exit;
import scala.collection.JavaConversions;

@InterfaceStability.Unstable
/* loaded from: input_file:kafka/tools/StreamsResetter.class */
public class StreamsResetter {
    private static final int EXIT_CODE_SUCCESS = 0;
    private static final int EXIT_CODE_ERROR = 1;
    private static OptionSpec<String> bootstrapServerOption;
    private static OptionSpec<String> zookeeperOption;
    private static OptionSpec<String> applicationIdOption;
    private static OptionSpec<String> inputTopicsOption;
    private static OptionSpec<String> intermediateTopicsOption;
    private static OptionSpecBuilder dryRunOption;
    private OptionSet options = null;
    private final Properties consumerConfig = new Properties();
    private final List<String> allTopics = new LinkedList();
    private boolean dryRun = false;

    public int run(String[] strArr) {
        return run(strArr, new Properties());
    }

    public int run(String[] strArr, Properties properties) {
        AdminClient createSimplePlaintext;
        String str;
        ZkUtils apply;
        this.consumerConfig.clear();
        this.consumerConfig.putAll(properties);
        int i = 0;
        AdminClient adminClient = null;
        ZkUtils zkUtils = null;
        try {
            try {
                parseArguments(strArr);
                this.dryRun = this.options.has(dryRunOption);
                createSimplePlaintext = AdminClient.createSimplePlaintext((String) this.options.valueOf(bootstrapServerOption));
                str = (String) this.options.valueOf(applicationIdOption);
                apply = ZkUtils.apply((String) this.options.valueOf(zookeeperOption), 30000, 30000, JaasUtils.isZkSecurityEnabled());
                this.allTopics.clear();
                this.allTopics.addAll(JavaConversions.seqAsJavaList(apply.getAllTopics()));
            } catch (Throwable th) {
                i = 1;
                System.err.println("ERROR: " + th.getMessage());
                if (0 != 0) {
                    adminClient.close();
                }
                if (0 != 0) {
                    zkUtils.close();
                }
            }
            if (!createSimplePlaintext.describeConsumerGroup(str, 0L).consumers().get().isEmpty()) {
                throw new IllegalStateException("Consumer group '" + str + "' is still active. Make sure to stop all running application instances before running the reset tool.");
            }
            if (this.dryRun) {
                System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
            }
            maybeResetInputAndSeekToEndIntermediateTopicOffsets();
            maybeDeleteInternalTopics(apply);
            if (createSimplePlaintext != null) {
                createSimplePlaintext.close();
            }
            if (apply != null) {
                apply.close();
            }
            return i;
        } catch (Throwable th2) {
            if (0 != 0) {
                adminClient.close();
            }
            if (0 != 0) {
                zkUtils.close();
            }
            throw th2;
        }
    }

    private void parseArguments(String[] strArr) throws IOException {
        OptionParser optionParser = new OptionParser(false);
        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id).").withRequiredArg().ofType(String.class).describedAs(MessagePropertyNames.ID).required();
        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2").withRequiredArg().ofType(String.class).defaultsTo("localhost:9092", new String[0]).describedAs("urls");
        zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper url with format: HOST:POST").withRequiredArg().ofType(String.class).defaultsTo("localhost:2181", new String[0]).describedAs("url");
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method). For these topics, the tool will skip to the end.").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
        try {
            this.options = optionParser.parse(strArr);
        } catch (OptionException e) {
            printHelp(optionParser);
            throw e;
        }
    }

    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
        List<String> valuesOf = this.options.valuesOf(inputTopicsOption);
        List<String> valuesOf2 = this.options.valuesOf(intermediateTopicsOption);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        String str = (String) this.options.valueOf(applicationIdOption);
        if (valuesOf.size() == 0 && valuesOf2.size() == 0) {
            System.out.println("No input or intermediate topics specified. Skipping seek.");
            return;
        }
        if (!this.dryRun) {
            if (valuesOf.size() != 0) {
                System.out.println("Seek-to-beginning for input topics " + valuesOf);
            }
            if (valuesOf2.size() != 0) {
                System.out.println("Seek-to-end for intermediate topics " + valuesOf2);
            }
        }
        HashSet hashSet = new HashSet(valuesOf.size() + valuesOf2.size());
        for (String str2 : valuesOf) {
            if (this.allTopics.contains(str2)) {
                hashSet.add(str2);
            } else {
                arrayList.add(str2);
            }
        }
        for (String str3 : valuesOf2) {
            if (this.allTopics.contains(str3)) {
                hashSet.add(str3);
            } else {
                arrayList2.add(str3);
            }
        }
        Properties properties = new Properties();
        properties.putAll(this.consumerConfig);
        properties.setProperty("bootstrap.servers", (String) this.options.valueOf(bootstrapServerOption));
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, str);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        try {
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties, (Deserializer<byte[]>) new ByteArrayDeserializer(), (Deserializer<byte[]>) new ByteArrayDeserializer());
            Throwable th = null;
            try {
                try {
                    kafkaConsumer.subscribe(hashSet);
                    kafkaConsumer.poll(1L);
                    Set<TopicPartition> assignment = kafkaConsumer.assignment();
                    Set<TopicPartition> hashSet2 = new HashSet<>();
                    Set<TopicPartition> hashSet3 = new HashSet<>();
                    for (TopicPartition topicPartition : assignment) {
                        String str4 = topicPartition.topic();
                        if (isInputTopic(str4)) {
                            hashSet2.add(topicPartition);
                        } else if (isIntermediateTopic(str4)) {
                            hashSet3.add(topicPartition);
                        } else {
                            System.err.println("Skipping invalid partition: " + topicPartition);
                        }
                    }
                    maybeSeekToBeginning(kafkaConsumer, hashSet2);
                    maybeSeekToEnd(kafkaConsumer, hashSet3);
                    if (!this.dryRun) {
                        Iterator<TopicPartition> it2 = assignment.iterator();
                        while (it2.hasNext()) {
                            kafkaConsumer.position(it2.next());
                        }
                        kafkaConsumer.commitSync();
                    }
                    if (arrayList.size() > 0) {
                        System.out.println("Following input topics are not found, skipping them");
                        Iterator it3 = arrayList.iterator();
                        while (it3.hasNext()) {
                            System.out.println("Topic: " + ((String) it3.next()));
                        }
                    }
                    if (arrayList2.size() > 0) {
                        System.out.println("Following intermediate topics are not found, skipping them");
                        Iterator it4 = arrayList2.iterator();
                        while (it4.hasNext()) {
                            System.out.println("Topic:" + ((String) it4.next()));
                        }
                    }
                    if (kafkaConsumer != null) {
                        if (0 != 0) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                    System.out.println("Done.");
                } finally {
                }
            } finally {
            }
        } catch (RuntimeException e) {
            System.err.println("ERROR: Resetting offsets failed.");
            throw e;
        }
    }

    private void maybeSeekToEnd(KafkaConsumer<byte[], byte[]> kafkaConsumer, Set<TopicPartition> set) {
        String str = (String) this.options.valueOf(applicationIdOption);
        List<String> valuesOf = this.options.valuesOf(intermediateTopicsOption);
        if (set.size() > 0) {
            if (!this.dryRun) {
                kafkaConsumer.seekToEnd(set);
                return;
            }
            System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + str + StringPool.RIGHT_BRACKET);
            for (String str2 : valuesOf) {
                if (this.allTopics.contains(str2)) {
                    System.out.println("Topic: " + str2);
                }
            }
        }
    }

    private void maybeSeekToBeginning(KafkaConsumer<byte[], byte[]> kafkaConsumer, Set<TopicPartition> set) {
        List<String> valuesOf = this.options.valuesOf(inputTopicsOption);
        String str = (String) this.options.valueOf(applicationIdOption);
        if (set.size() > 0) {
            if (!this.dryRun) {
                kafkaConsumer.seekToBeginning(set);
                return;
            }
            System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + str + StringPool.RIGHT_BRACKET);
            for (String str2 : valuesOf) {
                if (this.allTopics.contains(str2)) {
                    System.out.println("Topic: " + str2);
                }
            }
        }
    }

    private boolean isInputTopic(String str) {
        return this.options.valuesOf(inputTopicsOption).contains(str);
    }

    private boolean isIntermediateTopic(String str) {
        return this.options.valuesOf(intermediateTopicsOption).contains(str);
    }

    private void maybeDeleteInternalTopics(ZkUtils zkUtils) {
        System.out.println("Deleting all internal/auto-created topics for application " + ((String) this.options.valueOf(applicationIdOption)));
        for (String str : this.allTopics) {
            if (isInternalTopic(str)) {
                try {
                    if (this.dryRun) {
                        System.out.println("Topic: " + str);
                    } else {
                        TopicCommand.deleteTopic(zkUtils, new TopicCommand.TopicCommandOptions(new String[]{"--zookeeper", (String) this.options.valueOf(zookeeperOption), "--delete", "--topic", str}));
                    }
                } catch (RuntimeException e) {
                    System.err.println("ERROR: Deleting topic " + str + " failed.");
                    throw e;
                }
            }
        }
        System.out.println("Done.");
    }

    private boolean isInternalTopic(String str) {
        return str.startsWith(new StringBuilder().append((String) this.options.valueOf(applicationIdOption)).append("-").toString()) && (str.endsWith("-changelog") || str.endsWith("-repartition"));
    }

    private void printHelp(OptionParser optionParser) throws IOException {
        System.err.println("The Application Reset Tool allows you to quickly reset an application in order to reprocess its data from scratch.\n* This tool resets offsets of input topics to the earliest available offset and it skips to the end of intermediate topics (topics used in the through() method).\n* This tool deletes the internal topics that were created by Kafka Streams (topics starting with \"<application.id>-\").\nYou do not need to specify internal topics because the tool finds them automatically.\n* This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command).\n* This tool will not clean up the local state on the stream application instances (the persisted stores used to cache aggregation results).\nYou need to call KafkaStreams#cleanUp() in your application or manually delete them from the directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id> by default).\n\n*** Important! You will get wrong output if you don't clean up the local stores after running the reset tool!\n\n");
        optionParser.printHelpOn(System.err);
    }

    public static void main(String[] strArr) {
        Exit.exit(new StreamsResetter().run(strArr));
    }
}
