/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class StreamsUpgradeTest {
    public static void main(String[] args) throws Exception {
        DefaultKafkaClientSupplier kafkaClientSupplier;
        if (args.length < 1) {
            System.err.println("StreamsUpgradeTest requires one argument (properties-file) but no provided: ");
        }
        String propFileName = args.length > 0 ? args[0] : null;
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
        System.out.println("props=" + streamsProperties);
        StreamsBuilder builder = new StreamsBuilder();
        KStream dataStream = builder.stream("data");
        dataStream.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        dataStream.to("echo");
        Properties config = new Properties();
        config.setProperty("application.id", "StreamsUpgradeTest");
        config.put("commit.interval.ms", (Object)1000);
        if (streamsProperties.containsKey("test.future.metadata")) {
            streamsProperties.remove("test.future.metadata");
            kafkaClientSupplier = new FutureKafkaClientSupplier();
        } else {
            kafkaClientSupplier = new DefaultKafkaClientSupplier();
        }
        config.putAll((Map<?, ?>)streamsProperties);
        KafkaStreams streams = new KafkaStreams(builder.build(), config, (KafkaClientSupplier)kafkaClientSupplier);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("closing Kafka Streams instance");
            System.out.flush();
            streams.close();
            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
            System.out.flush();
        }));
    }

    private static class FutureAssignmentInfo
    extends AssignmentInfo {
        private final boolean bumpUsedVersion;
        private final boolean bumpSupportedVersion;
        final ByteBuffer originalUserMetadata;

        private FutureAssignmentInfo(boolean bumpUsedVersion, boolean bumpSupportedVersion, ByteBuffer bytes) {
            super(4, 4);
            this.bumpUsedVersion = bumpUsedVersion;
            this.bumpSupportedVersion = bumpSupportedVersion;
            this.originalUserMetadata = bytes;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public ByteBuffer encode() {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            this.originalUserMetadata.rewind();
            try (DataOutputStream out = new DataOutputStream(baos);){
                if (this.bumpUsedVersion) {
                    this.originalUserMetadata.getInt();
                    out.writeInt(5);
                } else {
                    out.writeInt(this.originalUserMetadata.getInt());
                }
                if (this.bumpSupportedVersion) {
                    this.originalUserMetadata.getInt();
                    out.writeInt(5);
                }
                try {
                    while (true) {
                        out.write(this.originalUserMetadata.get());
                    }
                }
                catch (BufferUnderflowException bufferUnderflowException) {
                    out.flush();
                    out.close();
                    ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
                    return byteBuffer;
                }
            }
            catch (IOException ex) {
                throw new TaskAssignmentException("Failed to encode AssignmentInfo", (Throwable)ex);
            }
        }
    }

    private static class FutureSubscriptionInfo
    extends SubscriptionInfo {
        FutureSubscriptionInfo(int version, int latestSupportedVersion, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
            super(version, latestSupportedVersion, processId, prevTasks, standbyTasks, userEndPoint);
        }

        public ByteBuffer encode() {
            if (this.version() <= 4) {
                ByteBuffer buf = super.encode();
                buf.putInt(4, this.latestSupportedVersion());
                return buf;
            }
            ByteBuffer buf = this.encodeFutureVersion();
            buf.rewind();
            return buf;
        }

        private ByteBuffer encodeFutureVersion() {
            byte[] endPointBytes = this.prepareUserEndPoint();
            ByteBuffer buf = ByteBuffer.allocate(this.getVersionThreeAndFourByteLength(endPointBytes));
            buf.putInt(5);
            buf.putInt(5);
            this.encodeClientUUID(buf);
            this.encodeTasks(buf, this.prevTasks());
            this.encodeTasks(buf, this.standbyTasks());
            this.encodeUserEndPoint(buf, endPointBytes);
            return buf;
        }
    }

    public static class FutureStreamsPartitionAssignor
    extends StreamsPartitionAssignor {
        public FutureStreamsPartitionAssignor() {
            this.usedSubscriptionMetadataVersion = 5;
        }

        public PartitionAssignor.Subscription subscription(Set<String> topics) {
            TaskManager taskManager = this.taskManger();
            Set previousActiveTasks = taskManager.prevActiveTaskIds();
            Set standbyTasks = taskManager.cachedTasksIds();
            standbyTasks.removeAll(previousActiveTasks);
            FutureSubscriptionInfo data = new FutureSubscriptionInfo(this.usedSubscriptionMetadataVersion, 5, taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint());
            taskManager.updateSubscriptionsFromMetadata(topics);
            return new PartitionAssignor.Subscription(new ArrayList<String>(topics), data.encode());
        }

        public void onAssignment(PartitionAssignor.Assignment assignment) {
            try {
                super.onAssignment(assignment);
                return;
            }
            catch (TaskAssignmentException taskAssignmentException) {
                int usedVersion;
                ByteBuffer data = assignment.userData();
                data.rewind();
                try (DataInputStream in = new DataInputStream((InputStream)new ByteBufferInputStream(data));){
                    usedVersion = in.readInt();
                }
                catch (IOException ex) {
                    throw new TaskAssignmentException("Failed to decode AssignmentInfo", (Throwable)ex);
                }
                if (usedVersion > 5) {
                    throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + 4 + 1);
                }
                AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)assignment.userData().putInt(0, 4));
                if (super.maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) {
                    this.setAssignmentErrorCode(StreamsPartitionAssignor.Error.VERSION_PROBING.code());
                    return;
                }
                ArrayList partitions = new ArrayList(assignment.partitions());
                partitions.sort(PARTITION_COMPARATOR);
                HashMap activeTasks = new HashMap();
                HashMap topicToPartitionInfo = new HashMap();
                this.processLatestVersionAssignment(info, partitions, activeTasks, topicToPartitionInfo);
                Map partitionsByHost = info.partitionsByHost();
                TaskManager taskManager = this.taskManger();
                taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
                taskManager.setPartitionsByHostState(partitionsByHost);
                taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
                taskManager.updateSubscriptionsFromAssignment(partitions);
                return;
            }
        }

        public Map<String, PartitionAssignor.Assignment> assign(Cluster metadata, Map<String, PartitionAssignor.Subscription> subscriptions) {
            boolean bumpSupportedVersion;
            Map assignment = null;
            HashMap downgradedSubscriptions = new HashMap();
            for (PartitionAssignor.Subscription subscription : subscriptions.values()) {
                SubscriptionInfo info = SubscriptionInfo.decode((ByteBuffer)subscription.userData());
                if (info.version() >= 5) continue;
                assignment = super.assign(metadata, subscriptions);
                break;
            }
            boolean bumpUsedVersion = false;
            if (assignment != null) {
                bumpSupportedVersion = this.supportedVersions.size() == 1 && (Integer)this.supportedVersions.iterator().next() == 5;
            } else {
                for (Map.Entry entry : subscriptions.entrySet()) {
                    PartitionAssignor.Subscription subscription = (PartitionAssignor.Subscription)entry.getValue();
                    SubscriptionInfo info = SubscriptionInfo.decode((ByteBuffer)subscription.userData().putInt(0, 4).putInt(4, 4));
                    downgradedSubscriptions.put(entry.getKey(), new PartitionAssignor.Subscription(subscription.topics(), new SubscriptionInfo(info.processId(), info.prevTasks(), info.standbyTasks(), info.userEndPoint()).encode()));
                }
                assignment = super.assign(metadata, downgradedSubscriptions);
                bumpUsedVersion = true;
                bumpSupportedVersion = true;
            }
            HashMap<String, PartitionAssignor.Assignment> newAssignment = new HashMap<String, PartitionAssignor.Assignment>();
            for (Map.Entry entry : assignment.entrySet()) {
                PartitionAssignor.Assignment singleAssignment = (PartitionAssignor.Assignment)entry.getValue();
                newAssignment.put((String)entry.getKey(), new PartitionAssignor.Assignment(singleAssignment.partitions(), new FutureAssignmentInfo(bumpUsedVersion, bumpSupportedVersion, singleAssignment.userData()).encode()));
            }
            return newAssignment;
        }
    }

    private static class FutureKafkaClientSupplier
    extends DefaultKafkaClientSupplier {
        private FutureKafkaClientSupplier() {
        }

        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            config.put("partition.assignment.strategy", FutureStreamsPartitionAssignor.class.getName());
            return new KafkaConsumer(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        }
    }
}

