/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.DefaultStreamPartitioner;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.slf4j.Logger;

public class StreamsMetadataState {
    private final Logger log;
    public static final HostInfo UNKNOWN_HOST = HostInfo.unavailable();
    private final TopologyMetadata topologyMetadata;
    private final Set<String> globalStores;
    private final HostInfo thisHost;
    private List<StreamsMetadata> allMetadata = Collections.emptyList();
    private Cluster clusterMetadata;
    private final AtomicReference<StreamsMetadata> localMetadata = new AtomicReference<Object>(null);

    public StreamsMetadataState(TopologyMetadata topologyMetadata, HostInfo thisHost, LogContext logContext) {
        this.topologyMetadata = topologyMetadata;
        this.globalStores = this.topologyMetadata.globalStateStores().keySet();
        this.thisHost = thisHost;
        this.log = logContext.logger(this.getClass());
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        builder.append(indent).append("GlobalMetadata: ").append(this.allMetadata).append("\n");
        builder.append(indent).append("GlobalStores: ").append(this.globalStores).append("\n");
        builder.append(indent).append("My HostInfo: ").append(this.thisHost).append("\n");
        builder.append(indent).append(this.clusterMetadata).append("\n");
        return builder.toString();
    }

    public StreamsMetadata getLocalMetadata() {
        return this.localMetadata.get();
    }

    public Collection<StreamsMetadata> getAllMetadata() {
        return Collections.unmodifiableList(this.allMetadata);
    }

    public synchronized Collection<StreamsMetadata> getAllMetadataForStore(String storeName) {
        Objects.requireNonNull(storeName, "storeName cannot be null");
        if (this.topologyMetadata.hasNamedTopologies()) {
            throw new IllegalArgumentException("Cannot invoke the getAllMetadataForStore(storeName) method whenusing named topologies, please use the overload that acceptsa topologyName parameter to identify the correct store");
        }
        if (!this.isInitialized()) {
            return Collections.emptyList();
        }
        if (this.globalStores.contains(storeName)) {
            return this.allMetadata;
        }
        Collection<String> sourceTopics = this.topologyMetadata.sourceTopicsForStore(storeName, null);
        if (sourceTopics.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<StreamsMetadata> results = new ArrayList<StreamsMetadata>();
        for (StreamsMetadata metadata : this.allMetadata) {
            if (!metadata.stateStoreNames().contains(storeName) && !metadata.standbyStateStoreNames().contains(storeName)) continue;
            results.add(metadata);
        }
        return results;
    }

    public synchronized Collection<StreamsMetadata> getAllMetadataForStore(String storeName, String topologyName) {
        Objects.requireNonNull(storeName, "storeName cannot be null");
        Objects.requireNonNull(topologyName, "topologyName cannot be null");
        if (!this.isInitialized()) {
            return Collections.emptyList();
        }
        Collection<String> sourceTopics = this.topologyMetadata.sourceTopicsForStore(storeName, topologyName);
        if (sourceTopics.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<StreamsMetadata> results = new ArrayList<StreamsMetadata>();
        for (StreamsMetadata metadata : this.allMetadata) {
            String metadataTopologyName = ((StreamsMetadataImpl)metadata).topologyName();
            if ((metadataTopologyName == null || !metadataTopologyName.equals(topologyName) || !metadata.stateStoreNames().contains(storeName)) && !metadata.standbyStateStoreNames().contains(storeName)) continue;
            results.add(metadata);
        }
        return results;
    }

    public synchronized Collection<StreamsMetadata> getAllMetadataForTopology(String topologyName) {
        Objects.requireNonNull(topologyName, "topologyName cannot be null");
        if (!this.isInitialized()) {
            return Collections.emptyList();
        }
        ArrayList<StreamsMetadata> results = new ArrayList<StreamsMetadata>();
        for (StreamsMetadata metadata : this.allMetadata) {
            String metadataTopologyName = ((StreamsMetadataImpl)metadata).topologyName();
            if (metadataTopologyName == null || !metadataTopologyName.equals(topologyName)) continue;
            results.add(metadata);
        }
        return results;
    }

    public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(String storeName, K key, Serializer<K> keySerializer) {
        Objects.requireNonNull(keySerializer, "keySerializer can't be null");
        if (this.topologyMetadata.hasNamedTopologies()) {
            throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, keySerializer)method when using named topologies, please use the overload thataccepts a topologyName parameter to identify the correct store");
        }
        return this.getKeyQueryMetadataForKey(storeName, key, new DefaultStreamPartitioner(keySerializer, this.clusterMetadata));
    }

    public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(String storeName, K key, Serializer<K> keySerializer, String topologyName) {
        Objects.requireNonNull(keySerializer, "keySerializer can't be null");
        return this.getKeyQueryMetadataForKey(storeName, key, new DefaultStreamPartitioner(keySerializer, this.clusterMetadata), topologyName);
    }

    public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner) {
        Objects.requireNonNull(storeName, "storeName can't be null");
        Objects.requireNonNull(key, "key can't be null");
        Objects.requireNonNull(partitioner, "partitioner can't be null");
        if (this.topologyMetadata.hasNamedTopologies()) {
            throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, partitioner)method when using named topologies, please use the overload thataccepts a topologyName parameter to identify the correct store");
        }
        if (!this.isInitialized()) {
            return KeyQueryMetadata.NOT_AVAILABLE;
        }
        if (this.globalStores.contains(storeName)) {
            if (this.thisHost.equals(UNKNOWN_HOST)) {
                return new KeyQueryMetadata(this.allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
            }
            return new KeyQueryMetadata(this.localMetadata.get().hostInfo(), Collections.emptySet(), -1);
        }
        SourceTopicsInfo sourceTopicsInfo = this.getSourceTopicsInfo(storeName);
        if (sourceTopicsInfo == null) {
            return null;
        }
        return this.getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
    }

    public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner, String topologyName) {
        Objects.requireNonNull(storeName, "storeName can't be null");
        Objects.requireNonNull(key, "key can't be null");
        Objects.requireNonNull(partitioner, "partitioner can't be null");
        Objects.requireNonNull(topologyName, "topologyName can't be null");
        if (!this.isInitialized()) {
            return KeyQueryMetadata.NOT_AVAILABLE;
        }
        SourceTopicsInfo sourceTopicsInfo = this.getSourceTopicsInfo(storeName, topologyName);
        if (sourceTopicsInfo == null) {
            return null;
        }
        return this.getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo, topologyName);
    }

    synchronized void onChange(Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap, Cluster clusterMetadata) {
        this.clusterMetadata = clusterMetadata;
        this.rebuildMetadata(activePartitionHostMap, standbyPartitionHostMap);
    }

    private boolean hasPartitionsForAnyTopics(List<String> topicNames, Set<TopicPartition> partitionForHost) {
        for (TopicPartition topicPartition : partitionForHost) {
            if (!topicNames.contains(topicPartition.topic())) continue;
            return true;
        }
        return false;
    }

    private Set<String> getStoresOnHost(Map<String, List<String>> storeToSourceTopics, Set<TopicPartition> sourceTopicPartitions) {
        HashSet<String> storesOnHost = new HashSet<String>();
        for (Map.Entry<String, List<String>> storeTopicEntry : storeToSourceTopics.entrySet()) {
            List<String> topicsForStore = storeTopicEntry.getValue();
            if (!this.hasPartitionsForAnyTopics(topicsForStore, sourceTopicPartitions)) continue;
            storesOnHost.add(storeTopicEntry.getKey());
        }
        return storesOnHost;
    }

    private void rebuildMetadata(Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
        if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
            this.allMetadata = Collections.emptyList();
            this.localMetadata.set(new StreamsMetadataImpl(this.thisHost, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet()));
            return;
        }
        this.allMetadata = this.topologyMetadata.hasNamedTopologies() ? this.rebuildMetadataForNamedTopologies(activePartitionHostMap, standbyPartitionHostMap) : this.rebuildMetadataForSingleTopology(activePartitionHostMap, standbyPartitionHostMap);
    }

    private List<StreamsMetadata> rebuildMetadataForNamedTopologies(Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
        ArrayList<StreamsMetadata> rebuiltMetadata = new ArrayList<StreamsMetadata>();
        Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream()).distinct().sorted(Comparator.comparing(HostInfo::host).thenComparingInt(HostInfo::port)).forEach(hostInfo -> {
            for (String topologyName : this.topologyMetadata.namedTopologiesView()) {
                Map<String, List<String>> storeToSourceTopics = this.topologyMetadata.stateStoreNameToSourceTopicsForTopology(topologyName);
                HashSet<TopicPartition> activePartitionsOnHost = new HashSet<TopicPartition>();
                HashSet<String> activeStoresOnHost = new HashSet<String>();
                if (activePartitionHostMap.containsKey(hostInfo)) {
                    activePartitionsOnHost.addAll(((Set)activePartitionHostMap.get(hostInfo)).stream().filter(tp -> this.topologyMetadata.fullSourceTopicNamesForTopology(topologyName).contains(tp.topic())).collect(Collectors.toSet()));
                    activeStoresOnHost.addAll(this.getStoresOnHost(storeToSourceTopics, activePartitionsOnHost));
                }
                HashSet<TopicPartition> standbyPartitionsOnHost = new HashSet<TopicPartition>();
                HashSet<String> standbyStoresOnHost = new HashSet<String>();
                if (standbyPartitionHostMap.containsKey(hostInfo)) {
                    standbyPartitionsOnHost.addAll(((Set)standbyPartitionHostMap.get(hostInfo)).stream().filter(tp -> this.topologyMetadata.fullSourceTopicNamesForTopology(topologyName).contains(tp.topic())).collect(Collectors.toSet()));
                    standbyStoresOnHost.addAll(this.getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost));
                }
                if (!(activeStoresOnHost.isEmpty() && activePartitionsOnHost.isEmpty() && standbyStoresOnHost.isEmpty() && standbyPartitionsOnHost.isEmpty())) {
                    StreamsMetadataImpl metadata = new StreamsMetadataImpl((HostInfo)hostInfo, (Set<String>)activeStoresOnHost, (Set<TopicPartition>)activePartitionsOnHost, (Set<String>)standbyStoresOnHost, (Set<TopicPartition>)standbyPartitionsOnHost, topologyName);
                    rebuiltMetadata.add(metadata);
                    if (!hostInfo.equals(this.thisHost)) continue;
                    this.localMetadata.set(metadata);
                    continue;
                }
                this.log.debug("Host {} has no tasks for topology {} at the moment, this metadata will not be built", hostInfo, (Object)topologyName);
            }
            Map<String, List<String>> storeToSourceTopics = this.topologyMetadata.stateStoreNameToSourceTopics();
            Set localActivePartitions = (Set)activePartitionHostMap.get(this.thisHost);
            Set localStandbyPartitions = (Set)standbyPartitionHostMap.get(this.thisHost);
            this.localMetadata.set(new StreamsMetadataImpl(this.thisHost, this.getStoresOnHost(storeToSourceTopics, localActivePartitions), localActivePartitions, this.getStoresOnHost(storeToSourceTopics, localStandbyPartitions), localStandbyPartitions));
        });
        return rebuiltMetadata;
    }

    private List<StreamsMetadata> rebuildMetadataForSingleTopology(Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
        ArrayList<StreamsMetadata> rebuiltMetadata = new ArrayList<StreamsMetadata>();
        Map<String, List<String>> storeToSourceTopics = this.topologyMetadata.stateStoreNameToSourceTopics();
        Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream()).distinct().sorted(Comparator.comparing(HostInfo::host).thenComparingInt(HostInfo::port)).forEach(hostInfo -> {
            HashSet<TopicPartition> activePartitionsOnHost = new HashSet<TopicPartition>();
            HashSet<String> activeStoresOnHost = new HashSet<String>();
            if (activePartitionHostMap.containsKey(hostInfo)) {
                activePartitionsOnHost.addAll((Collection)activePartitionHostMap.get(hostInfo));
                activeStoresOnHost.addAll(this.getStoresOnHost(storeToSourceTopics, activePartitionsOnHost));
            }
            activeStoresOnHost.addAll(this.globalStores);
            HashSet<TopicPartition> standbyPartitionsOnHost = new HashSet<TopicPartition>();
            HashSet<String> standbyStoresOnHost = new HashSet<String>();
            if (standbyPartitionHostMap.containsKey(hostInfo)) {
                standbyPartitionsOnHost.addAll((Collection)standbyPartitionHostMap.get(hostInfo));
                standbyStoresOnHost.addAll(this.getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost));
            }
            StreamsMetadataImpl metadata = new StreamsMetadataImpl((HostInfo)hostInfo, (Set<String>)activeStoresOnHost, (Set<TopicPartition>)activePartitionsOnHost, (Set<String>)standbyStoresOnHost, (Set<TopicPartition>)standbyPartitionsOnHost);
            rebuiltMetadata.add(metadata);
            if (hostInfo.equals(this.thisHost)) {
                this.localMetadata.set(metadata);
            }
        });
        return rebuiltMetadata;
    }

    private <K> KeyQueryMetadata getKeyQueryMetadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner, SourceTopicsInfo sourceTopicsInfo) {
        Integer partition = partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions);
        HashSet<TopicPartition> matchingPartitions = new HashSet<TopicPartition>();
        for (String sourceTopic : sourceTopicsInfo.sourceTopics) {
            matchingPartitions.add(new TopicPartition(sourceTopic, partition.intValue()));
        }
        HostInfo activeHost = UNKNOWN_HOST;
        HashSet<HostInfo> standbyHosts = new HashSet<HostInfo>();
        for (StreamsMetadata streamsMetadata : this.allMetadata) {
            Set<String> activeStateStoreNames = streamsMetadata.stateStoreNames();
            HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>(streamsMetadata.topicPartitions());
            Set<String> standbyStateStoreNames = streamsMetadata.standbyStateStoreNames();
            HashSet<TopicPartition> standbyTopicPartitions = new HashSet<TopicPartition>(streamsMetadata.standbyTopicPartitions());
            topicPartitions.retainAll(matchingPartitions);
            if (activeStateStoreNames.contains(storeName) && !topicPartitions.isEmpty()) {
                activeHost = streamsMetadata.hostInfo();
            }
            standbyTopicPartitions.retainAll(matchingPartitions);
            if (!standbyStateStoreNames.contains(storeName) || standbyTopicPartitions.isEmpty()) continue;
            standbyHosts.add(streamsMetadata.hostInfo());
        }
        return new KeyQueryMetadata(activeHost, standbyHosts, partition);
    }

    private <K> KeyQueryMetadata getKeyQueryMetadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner, SourceTopicsInfo sourceTopicsInfo, String topologyName) {
        Objects.requireNonNull(topologyName, "topology name must not be null");
        Integer partition = partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions);
        HashSet<TopicPartition> matchingPartitions = new HashSet<TopicPartition>();
        for (String sourceTopic : sourceTopicsInfo.sourceTopics) {
            matchingPartitions.add(new TopicPartition(sourceTopic, partition.intValue()));
        }
        HostInfo activeHost = UNKNOWN_HOST;
        HashSet<HostInfo> standbyHosts = new HashSet<HostInfo>();
        for (StreamsMetadata streamsMetadata : this.allMetadata) {
            String metadataTopologyName = ((StreamsMetadataImpl)streamsMetadata).topologyName();
            if (metadataTopologyName == null || !metadataTopologyName.equals(topologyName)) continue;
            Set<String> activeStateStoreNames = streamsMetadata.stateStoreNames();
            HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>(streamsMetadata.topicPartitions());
            Set<String> standbyStateStoreNames = streamsMetadata.standbyStateStoreNames();
            HashSet<TopicPartition> standbyTopicPartitions = new HashSet<TopicPartition>(streamsMetadata.standbyTopicPartitions());
            topicPartitions.retainAll(matchingPartitions);
            if (activeStateStoreNames.contains(storeName) && !topicPartitions.isEmpty()) {
                activeHost = streamsMetadata.hostInfo();
            }
            standbyTopicPartitions.retainAll(matchingPartitions);
            if (!standbyStateStoreNames.contains(storeName) || standbyTopicPartitions.isEmpty()) continue;
            standbyHosts.add(streamsMetadata.hostInfo());
        }
        return new KeyQueryMetadata(activeHost, standbyHosts, partition);
    }

    private SourceTopicsInfo getSourceTopicsInfo(String storeName) {
        return this.getSourceTopicsInfo(storeName, null);
    }

    private SourceTopicsInfo getSourceTopicsInfo(String storeName, String topologyName) {
        ArrayList<String> sourceTopics = new ArrayList<String>(this.topologyMetadata.sourceTopicsForStore(storeName, topologyName));
        if (sourceTopics.isEmpty()) {
            return null;
        }
        return new SourceTopicsInfo(sourceTopics);
    }

    private boolean isInitialized() {
        return this.clusterMetadata != null && !this.clusterMetadata.topics().isEmpty() && this.localMetadata.get() != null;
    }

    public String getStoreForChangelogTopic(String topicName) {
        return this.topologyMetadata.getStoreForChangelogTopic(topicName);
    }

    private class SourceTopicsInfo {
        private final List<String> sourceTopics;
        private int maxPartitions;
        private String topicWithMostPartitions;

        private SourceTopicsInfo(List<String> sourceTopics) {
            this.sourceTopics = sourceTopics;
            for (String topic : sourceTopics) {
                List partitions = StreamsMetadataState.this.clusterMetadata.partitionsForTopic(topic);
                if (partitions.size() <= this.maxPartitions) continue;
                this.maxPartitions = partitions.size();
                this.topicWithMostPartitions = ((PartitionInfo)partitions.get(0)).topic();
            }
        }
    }
}

