/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.migration;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.retry.RetryNTimes;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.migration.KafkaSpoutMigration;
import org.apache.storm.kafka.migration.MapUtil;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTridentSpoutMigration {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutMigration.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("Args: confFile");
            System.exit(1);
        }
        Map conf = Utils.findAndReadConfigFile((String)args[0]);
        Configuration configuration = new Configuration();
        configuration.zkHosts = (String)MapUtil.getOrError(conf, "zookeeper.servers");
        configuration.zkRoot = (String)MapUtil.getOrError(conf, "zookeeper.root");
        configuration.txId = (String)MapUtil.getOrError(conf, "txid");
        configuration.topic = (String)MapUtil.getOrError(conf, "topic");
        configuration.isWildcardTopic = (Boolean)MapUtil.getOrError(conf, "is.wildcard.topic");
        configuration.newTopologyTxId = (String)MapUtil.getOrError(conf, "new.topology.txid");
        configuration.zkSessionTimeoutMs = (Integer)MapUtil.getOrError(conf, "zookeeper.session.timeout.ms");
        configuration.zkConnectionTimeoutMs = (Integer)MapUtil.getOrError(conf, "zookeeper.connection.timeout.ms");
        configuration.zkRetryTimes = (Integer)MapUtil.getOrError(conf, "zookeeper.retry.times");
        configuration.zkRetryIntervalMs = (Integer)MapUtil.getOrError(conf, "zookeeper.retry.interval.ms");
        try (CuratorFramework curator = KafkaTridentSpoutMigration.newCurator(configuration);){
            curator.start();
            Map<TopicPartition, Map<Long, PartitionMetadata>> offsetsToMigrate = KafkaTridentSpoutMigration.getOffsetsToMigrate(curator, configuration);
            LOG.info("Migrating offsets {}", (Object)offsetsToMigrate);
            KafkaTridentSpoutMigration.migrateOffsets(curator, configuration, offsetsToMigrate);
            KafkaTridentSpoutMigration.migrateCoordinator(curator, configuration, new ArrayList<TopicPartition>(offsetsToMigrate.keySet()));
        }
    }

    private static Map<TopicPartition, Map<Long, PartitionMetadata>> getOffsetsAtPath(CuratorFramework curator, ObjectMapper objectMapper, String partitionsRoot) throws Exception {
        HashMap<TopicPartition, Map<Long, PartitionMetadata>> offsets = new HashMap<TopicPartition, Map<Long, PartitionMetadata>>();
        if (curator.checkExists().forPath(partitionsRoot) == null) {
            throw new RuntimeException("No such path " + partitionsRoot);
        }
        List partitionPaths = (List)curator.getChildren().forPath(partitionsRoot);
        for (String partitionPath : partitionPaths) {
            String absPartitionPath = partitionsRoot + "/" + partitionPath;
            List transactions = (List)curator.getChildren().forPath(absPartitionPath);
            HashMap<Long, PartitionMetadata> partitionMeta = new HashMap<Long, PartitionMetadata>();
            TopicPartition tp = null;
            for (String transaction : transactions) {
                String absTransactionPath = absPartitionPath + "/" + transaction;
                LOG.info("Reading offset data from path {}", (Object)absTransactionPath);
                byte[] partitionBytes = (byte[])curator.getData().forPath(absTransactionPath);
                Map<String, Object> partitionMetadata = objectMapper.readValue(partitionBytes, new TypeReference<Map<String, Object>>(){});
                tp = new TopicPartition((String)partitionMetadata.get("topic"), ((Number)partitionMetadata.get("partition")).intValue());
                PartitionMetadata meta = new PartitionMetadata(((Number)partitionMetadata.get("offset")).longValue(), ((Number)partitionMetadata.get("nextOffset")).longValue() - 1L);
                partitionMeta.put(Long.parseLong(transaction), meta);
            }
            if (tp == null) continue;
            offsets.put(tp, partitionMeta);
        }
        return offsets;
    }

    private static Map<TopicPartition, Map<Long, PartitionMetadata>> getOffsetsToMigrate(CuratorFramework curator, Configuration configuration) throws Exception {
        HashMap<TopicPartition, Map<Long, PartitionMetadata>> offsetsToMigrate = new HashMap<TopicPartition, Map<Long, PartitionMetadata>>();
        String streamRoot = configuration.zkRoot + "/" + configuration.txId + "/user";
        if (curator.checkExists().forPath(streamRoot) == null) {
            throw new RuntimeException("No such path " + streamRoot);
        }
        if (configuration.isWildcardTopic) {
            LOG.info("Expecting wildcard topics, looking for topics in {}", (Object)streamRoot);
            List topics = (List)curator.getChildren().forPath(streamRoot);
            for (String topic : topics) {
                if (!topic.matches(configuration.topic)) {
                    LOG.info("Skipping directory {} because it does not match topic pattern {}", (Object)topic, (Object)configuration.topic);
                    continue;
                }
                String partitionsRoot = streamRoot + "/" + topic;
                LOG.info("Looking for partitions in {}", (Object)partitionsRoot);
                offsetsToMigrate.putAll(KafkaTridentSpoutMigration.getOffsetsAtPath(curator, objectMapper, partitionsRoot));
            }
        } else {
            LOG.info("Expecting exact topic match, looking for offsets in {}", (Object)streamRoot);
            offsetsToMigrate.putAll(KafkaTridentSpoutMigration.getOffsetsAtPath(curator, objectMapper, streamRoot));
        }
        return offsetsToMigrate;
    }

    private static String coordinatorPath(Configuration configuration, String txid) {
        return configuration.zkRoot + "/" + txid + "/coordinator";
    }

    private static void migrateCoordinator(CuratorFramework curator, Configuration configuration, List<TopicPartition> topics) throws Exception {
        String oldCoordinatorRoot = KafkaTridentSpoutMigration.coordinatorPath(configuration, configuration.txId);
        String newCoordinatorRoot = KafkaTridentSpoutMigration.coordinatorPath(configuration, configuration.newTopologyTxId);
        String oldTxPath = oldCoordinatorRoot + "/currtx";
        String newTxPath = newCoordinatorRoot + "/currtx";
        KafkaTridentSpoutMigration.createOrUpdate(curator, newTxPath).forPath(newTxPath, (byte[])curator.getData().forPath(oldTxPath));
        String oldAttemptsPath = oldCoordinatorRoot + "/currattempts";
        String newAttemptsPath = newCoordinatorRoot + "/currattempts";
        KafkaTridentSpoutMigration.createOrUpdate(curator, newAttemptsPath).forPath(newAttemptsPath, (byte[])curator.getData().forPath(oldAttemptsPath));
        List transactions = (List)curator.getChildren().forPath(oldCoordinatorRoot + "/meta");
        ArrayList<Map<String, Object>> coordinatorMeta = new ArrayList<Map<String, Object>>();
        for (TopicPartition tp : topics) {
            coordinatorMeta.add(KafkaTridentSpoutMigration.tpMeta(tp));
        }
        for (String transaction : transactions) {
            String newMetaPath = newCoordinatorRoot + "/meta/" + transaction;
            KafkaTridentSpoutMigration.createOrUpdate(curator, newMetaPath).forPath(newMetaPath, objectMapper.writeValueAsBytes(coordinatorMeta));
        }
        LOG.info("Migrated coordinator data to new path {}", (Object)newCoordinatorRoot);
    }

    private static PathAndBytesable<?> createOrUpdate(CuratorFramework curator, String path) throws Exception {
        if (curator.checkExists().forPath(path) == null) {
            return curator.create().creatingParentsIfNeeded();
        }
        return curator.setData();
    }

    private static Map<String, Object> tpMeta(TopicPartition tp) {
        HashMap<String, Object> tpMeta = new HashMap<String, Object>();
        tpMeta.put("topic", tp.topic());
        tpMeta.put("partition", tp.partition());
        return tpMeta;
    }

    private static void migrateOffsets(CuratorFramework curator, Configuration configuration, Map<TopicPartition, Map<Long, PartitionMetadata>> offsets) throws Exception {
        String streamRoot = configuration.zkRoot + "/" + configuration.newTopologyTxId + "/user";
        for (Map.Entry<TopicPartition, Map<Long, PartitionMetadata>> offset : offsets.entrySet()) {
            TopicPartition tp = offset.getKey();
            for (Map.Entry<Long, PartitionMetadata> transaction : offset.getValue().entrySet()) {
                PartitionMetadata meta = transaction.getValue();
                HashMap<String, Object> metadataToWrite = new HashMap<String, Object>();
                metadataToWrite.put("firstOffset", meta.firstOffset);
                metadataToWrite.put("lastOffset", meta.lastOffset);
                metadataToWrite.put("tp", KafkaTridentSpoutMigration.tpMeta(tp));
                String partitionPath = streamRoot + "/" + tp.topic() + "@" + tp.partition() + "/" + String.valueOf(transaction.getKey());
                LOG.info("Writing {} to path {}", (Object)metadataToWrite, (Object)partitionPath);
                KafkaTridentSpoutMigration.createOrUpdate(curator, partitionPath).forPath(partitionPath, objectMapper.writeValueAsBytes(metadataToWrite));
            }
        }
        LOG.info("Migrated offsets {} to new root {}", (Object)offsets, (Object)streamRoot);
    }

    private static CuratorFramework newCurator(Configuration config) throws Exception {
        return CuratorFrameworkFactory.newClient(config.zkHosts, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, new RetryNTimes(config.zkRetryTimes, config.zkRetryIntervalMs));
    }

    private static class Configuration {
        private String zkHosts;
        private String zkRoot;
        private String txId;
        private String topic;
        private boolean isWildcardTopic;
        private String newTopologyTxId;
        private int zkSessionTimeoutMs;
        private int zkConnectionTimeoutMs;
        private int zkRetryTimes;
        private int zkRetryIntervalMs;

        private Configuration() {
        }
    }

    private static class PartitionMetadata {
        private final long firstOffset;
        private final long lastOffset;

        PartitionMetadata(long firstOffset, long lastOffset) {
            this.firstOffset = firstOffset;
            this.lastOffset = lastOffset;
        }

        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SIMPLE_STYLE);
        }
    }
}

