/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.AssignedStandbyTasks;
import org.apache.kafka.streams.processor.internals.AssignedStreamsTasks;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

public class StreamThread
extends Thread {
    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
    private final Time time;
    private final Logger log;
    private final String logPrefix;
    private final Object stateLock;
    private final Duration pollTime;
    private final long commitTimeMs;
    private final int maxPollTimeMs;
    private final String originalReset;
    private final TaskManager taskManager;
    private final StreamsMetricsThreadImpl streamsMetrics;
    private final AtomicInteger assignmentErrorCode;
    private long now;
    private long lastPollMs;
    private long lastCommitMs;
    private int numIterations;
    private Throwable rebalanceException = null;
    private boolean processStandbyRecords = false;
    private volatile State state = State.CREATED;
    private volatile ThreadMetadata threadMetadata;
    private StateListener stateListener;
    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
    final ConsumerRebalanceListener rebalanceListener;
    final Producer<byte[], byte[]> producer;
    final Consumer<byte[], byte[]> restoreConsumer;
    final Consumer<byte[], byte[]> consumer;
    final InternalTopologyBuilder builder;

    public void setStateListener(StateListener listener) {
        this.stateListener = listener;
    }

    public State state() {
        return this.state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    State setState(State newState) {
        State oldState;
        Object object = this.stateLock;
        synchronized (object) {
            oldState = this.state;
            if (this.state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                return null;
            }
            if (this.state == State.DEAD) {
                return null;
            }
            if (this.state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) {
                return null;
            }
            if (!this.state.isValidTransition(newState)) {
                this.log.error("Unexpected state transition from {} to {}", (Object)oldState, (Object)newState);
                throw new StreamsException(this.logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
            }
            this.log.info("State transition from {} to {}", (Object)oldState, (Object)newState);
            this.state = newState;
            if (newState == State.RUNNING) {
                this.updateThreadMetadata(this.taskManager.activeTasks(), this.taskManager.standbyTasks());
            } else {
                this.updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
            }
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(this, this.state, oldState);
        }
        return oldState;
    }

    public boolean isRunningAndNotRebalancing() {
        return this.state == State.RUNNING;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.isRunning();
        }
    }

    public static StreamThread create(InternalTopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, AdminClient adminClient, UUID processId, String clientId, Metrics metrics, Time time, StreamsMetadataState streamsMetadataState, long cacheSizeBytes, StateDirectory stateDirectory, StateRestoreListener userStateRestoreListener) {
        String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
        String logPrefix = String.format("stream-thread [%s] ", threadClientId);
        LogContext logContext = new LogContext(logPrefix);
        Logger log = logContext.logger(StreamThread.class);
        log.info("Creating restore consumer client");
        Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(StreamThread.getRestoreConsumerClientId(threadClientId));
        Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
        Duration pollTime = Duration.ofMillis(config.getLong("poll.ms"));
        StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, pollTime, userStateRestoreListener, logContext);
        Producer<byte[], byte[]> threadProducer = null;
        boolean eosEnabled = "exactly_once".equals(config.getString("processing.guarantee"));
        if (!eosEnabled) {
            Map<String, Object> producerConfigs = config.getProducerConfigs(StreamThread.getThreadProducerClientId(threadClientId));
            log.info("Creating shared producer client");
            threadProducer = clientSupplier.getProducer(producerConfigs);
        }
        StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics, threadClientId);
        ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
        TaskCreator activeTaskCreator = new TaskCreator(builder, config, streamsMetrics, stateDirectory, changelogReader, cache, time, clientSupplier, threadProducer, threadClientId, log);
        StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(builder, config, streamsMetrics, stateDirectory, changelogReader, time, log);
        TaskManager taskManager = new TaskManager(changelogReader, processId, logPrefix, restoreConsumer, streamsMetadataState, activeTaskCreator, standbyTaskCreator, adminClient, new AssignedStreamsTasks(logContext), new AssignedStandbyTasks(logContext));
        log.info("Creating consumer client");
        String applicationId = config.getString("application.id");
        Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, StreamThread.getConsumerClientId(threadClientId));
        consumerConfigs.put("__task.manager.instance__", taskManager);
        AtomicInteger assignmentErrorCode = new AtomicInteger();
        consumerConfigs.put("__assignment.error.code__", assignmentErrorCode);
        String originalReset = null;
        if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
            originalReset = (String)consumerConfigs.get("auto.offset.reset");
            consumerConfigs.put("auto.offset.reset", "none");
        }
        Consumer<byte[], byte[]> consumer = clientSupplier.getConsumer(consumerConfigs);
        taskManager.setConsumer(consumer);
        return new StreamThread(time, config, threadProducer, restoreConsumer, consumer, originalReset, taskManager, streamsMetrics, builder, threadClientId, logContext, assignmentErrorCode).updateThreadMetadata(StreamThread.getSharedAdminClientId(clientId));
    }

    public StreamThread(Time time, StreamsConfig config, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> restoreConsumer, Consumer<byte[], byte[]> consumer, String originalReset, TaskManager taskManager, StreamsMetricsThreadImpl streamsMetrics, InternalTopologyBuilder builder, String threadClientId, LogContext logContext, AtomicInteger assignmentErrorCode) {
        super(threadClientId);
        this.stateLock = new Object();
        this.standbyRecords = new HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>();
        this.time = time;
        this.builder = builder;
        this.streamsMetrics = streamsMetrics;
        this.logPrefix = logContext.logPrefix();
        this.log = logContext.logger(StreamThread.class);
        this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
        this.taskManager = taskManager;
        this.producer = producer;
        this.restoreConsumer = restoreConsumer;
        this.consumer = consumer;
        this.originalReset = originalReset;
        this.assignmentErrorCode = assignmentErrorCode;
        this.pollTime = Duration.ofMillis(config.getLong("poll.ms"));
        this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId")).getInt("max.poll.interval.ms");
        this.commitTimeMs = config.getLong("commit.interval.ms");
        this.numIterations = 1;
    }

    private static String getTaskProducerClientId(String threadClientId, TaskId taskId) {
        return threadClientId + "-" + taskId + "-producer";
    }

    private static String getThreadProducerClientId(String threadClientId) {
        return threadClientId + "-producer";
    }

    private static String getConsumerClientId(String threadClientId) {
        return threadClientId + "-consumer";
    }

    private static String getRestoreConsumerClientId(String threadClientId) {
        return threadClientId + "-restore-consumer";
    }

    public static String getSharedAdminClientId(String clientId) {
        return clientId + "-admin";
    }

    @Override
    public void run() {
        this.log.info("Starting");
        if (this.setState(State.STARTING) == null) {
            this.log.info("StreamThread already shutdown. Not running");
            return;
        }
        boolean cleanRun = false;
        try {
            this.runLoop();
            cleanRun = true;
        }
        catch (KafkaException e) {
            this.log.error("Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:", (Throwable)e);
            throw e;
        }
        catch (Exception e) {
            this.log.error("Encountered the following error during processing:", (Throwable)e);
            throw e;
        }
        finally {
            this.completeShutdown(cleanRun);
        }
    }

    private void setRebalanceException(Throwable rebalanceException) {
        this.rebalanceException = rebalanceException;
    }

    private void runLoop() {
        this.consumer.subscribe(this.builder.sourceTopicPattern(), this.rebalanceListener);
        while (this.isRunning()) {
            try {
                this.runOnce();
                if (this.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.VERSION_PROBING.code()) continue;
                this.log.info("Version probing detected. Triggering new rebalance.");
                this.enforceRebalance();
            }
            catch (TaskMigratedException ignoreAndRejoinGroup) {
                this.log.warn("Detected task {} that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}", (Object)ignoreAndRejoinGroup.migratedTask().id(), (Object)ignoreAndRejoinGroup.migratedTask().toString(">"));
                this.enforceRebalance();
            }
        }
    }

    private void enforceRebalance() {
        this.consumer.unsubscribe();
        this.consumer.subscribe(this.builder.sourceTopicPattern(), this.rebalanceListener);
    }

    void runOnce() {
        ConsumerRecords<byte[], byte[]> records;
        this.now = this.time.milliseconds();
        if (this.state == State.PARTITIONS_ASSIGNED) {
            records = this.pollRequests(Duration.ZERO);
        } else if (this.state == State.PARTITIONS_REVOKED) {
            records = this.pollRequests(this.pollTime);
        } else if (this.state == State.RUNNING || this.state == State.STARTING) {
            records = this.pollRequests(this.pollTime);
        } else {
            this.log.error("Unexpected state {} during normal iteration", (Object)this.state);
            throw new StreamsException(this.logPrefix + "Unexpected state " + this.state + " during normal iteration");
        }
        long pollLatency = this.advanceNowAndComputeLatency();
        if (records != null && !records.isEmpty()) {
            this.streamsMetrics.pollTimeSensor.record((double)pollLatency, this.now);
            this.addRecordsToTasks(records);
        }
        if (this.state == State.PARTITIONS_ASSIGNED && this.taskManager.updateNewAndRestoringTasks()) {
            this.setState(State.RUNNING);
        }
        this.advanceNowAndComputeLatency();
        if (this.taskManager.hasActiveRunningTasks()) {
            int processed = 0;
            long timeSinceLastPoll = 0L;
            do {
                for (int i = 0; i < this.numIterations && (processed = this.taskManager.process(this.now)) > 0; ++i) {
                    long processLatency = this.advanceNowAndComputeLatency();
                    this.streamsMetrics.processTimeSensor.record((double)processLatency / (double)processed, this.now);
                    int committed = this.taskManager.maybeCommitActiveTasksPerUserRequested();
                    if (committed <= 0) continue;
                    long commitLatency = this.advanceNowAndComputeLatency();
                    this.streamsMetrics.commitTimeSensor.record((double)commitLatency / (double)committed, this.now);
                }
                timeSinceLastPoll = Math.max(this.now - this.lastPollMs, 0L);
                if (this.maybePunctuate() || this.maybeCommit()) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                    continue;
                }
                if (timeSinceLastPoll > (long)(this.maxPollTimeMs / 2)) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                    break;
                }
                if (processed <= 0) continue;
                ++this.numIterations;
            } while (processed > 0);
        }
        this.maybeUpdateStandbyTasks();
        this.maybeCommit();
    }

    private ConsumerRecords<byte[], byte[]> pollRequests(Duration pollTime) {
        ConsumerRecords records = null;
        this.lastPollMs = this.now;
        try {
            records = this.consumer.poll(pollTime);
        }
        catch (InvalidOffsetException e) {
            this.resetInvalidOffsets(e);
        }
        if (this.rebalanceException != null) {
            if (this.rebalanceException instanceof TaskMigratedException) {
                throw (TaskMigratedException)((Object)this.rebalanceException);
            }
            throw new StreamsException(this.logPrefix + "Failed to rebalance.", this.rebalanceException);
        }
        return records;
    }

    private void resetInvalidOffsets(InvalidOffsetException e) {
        Set partitions = e.partitions();
        HashSet<String> loggedTopics = new HashSet<String>();
        HashSet<TopicPartition> seekToBeginning = new HashSet<TopicPartition>();
        HashSet<TopicPartition> seekToEnd = new HashSet<TopicPartition>();
        for (TopicPartition partition : partitions) {
            if (this.builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
                this.addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
                continue;
            }
            if (this.builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
                this.addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
                continue;
            }
            if (this.originalReset == null || !this.originalReset.equals("earliest") && !this.originalReset.equals("latest")) {
                String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))";
                throw new StreamsException(String.format("No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))", partition.topic(), partition.partition()), e);
            }
            if (this.originalReset.equals("earliest")) {
                this.addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
                continue;
            }
            if (!this.originalReset.equals("latest")) continue;
            this.addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
        }
        if (!seekToBeginning.isEmpty()) {
            this.consumer.seekToBeginning(seekToBeginning);
        }
        if (!seekToEnd.isEmpty()) {
            this.consumer.seekToEnd(seekToEnd);
        }
    }

    private void addToResetList(TopicPartition partition, Set<TopicPartition> partitions, String logMessage, String resetPolicy, Set<String> loggedTopics) {
        String topic = partition.topic();
        if (loggedTopics.add(topic)) {
            this.log.info(logMessage, (Object)topic, (Object)resetPolicy);
        }
        partitions.add(partition);
    }

    private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) {
        for (TopicPartition partition : records.partitions()) {
            StreamTask task = this.taskManager.activeTask(partition);
            if (task == null) {
                this.log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", (Object)partition, (Object)this.taskManager.toString(">"));
                throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
            }
            if (task.isClosed()) {
                this.log.info("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. Notifying the thread to trigger a new rebalance immediately.", (Object)task.id());
                throw new TaskMigratedException(task);
            }
            task.addRecords(partition, records.records(partition));
        }
    }

    private boolean maybePunctuate() {
        int punctuated = this.taskManager.punctuate();
        if (punctuated > 0) {
            long punctuateLatency = this.advanceNowAndComputeLatency();
            this.streamsMetrics.punctuateTimeSensor.record((double)punctuateLatency / (double)punctuated, this.now);
        }
        return punctuated > 0;
    }

    boolean maybeCommit() {
        int committed = 0;
        if (this.now - this.lastCommitMs > this.commitTimeMs) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.now - this.lastCommitMs, this.commitTimeMs});
            }
            if ((committed += this.taskManager.commitAll()) > 0) {
                long intervalCommitLatency = this.advanceNowAndComputeLatency();
                this.streamsMetrics.commitTimeSensor.record((double)intervalCommitLatency / (double)committed, this.now);
                this.taskManager.maybePurgeCommitedRecords();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Committed all active tasks {} and standby tasks {} in {}ms", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), intervalCommitLatency});
                }
            }
            this.lastCommitMs = this.now;
            this.processStandbyRecords = true;
        } else {
            int commitPerRequested = this.taskManager.maybeCommitActiveTasksPerUserRequested();
            if (commitPerRequested > 0) {
                long requestCommitLatency = this.advanceNowAndComputeLatency();
                this.streamsMetrics.commitTimeSensor.record((double)requestCommitLatency / (double)committed, this.now);
                committed += commitPerRequested;
            }
        }
        return committed > 0;
    }

    private void maybeUpdateStandbyTasks() {
        if (this.state == State.RUNNING && this.taskManager.hasStandbyRunningTasks()) {
            List<Object> remaining;
            if (this.processStandbyRecords) {
                if (!this.standbyRecords.isEmpty()) {
                    HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>();
                    for (Map.Entry<Object, List<Object>> entry : this.standbyRecords.entrySet()) {
                        TopicPartition partition = (TopicPartition)entry.getKey();
                        remaining = entry.getValue();
                        if (remaining == null) continue;
                        StandbyTask task = this.taskManager.standbyTask(partition);
                        if (task.isClosed()) {
                            this.log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. Notifying the thread to trigger a new rebalance immediately.", (Object)task.id());
                            throw new TaskMigratedException(task);
                        }
                        if (!(remaining = task.update(partition, remaining)).isEmpty()) {
                            remainingStandbyRecords.put(partition, remaining);
                            continue;
                        }
                        this.restoreConsumer.resume(Collections.singleton(partition));
                    }
                    this.standbyRecords = remainingStandbyRecords;
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Updated standby tasks {} in {}ms", this.taskManager.standbyTaskIds(), (Object)(this.time.milliseconds() - this.now));
                    }
                }
                this.processStandbyRecords = false;
            }
            try {
                ConsumerRecords records = this.restoreConsumer.poll(Duration.ZERO);
                if (!records.isEmpty()) {
                    for (TopicPartition topicPartition : records.partitions()) {
                        StandbyTask task = this.taskManager.standbyTask(topicPartition);
                        if (task == null) {
                            throw new StreamsException(this.logPrefix + "Missing standby task for partition " + topicPartition);
                        }
                        if (task.isClosed()) {
                            this.log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. Notifying the thread to trigger a new rebalance immediately.", (Object)task.id());
                            throw new TaskMigratedException(task);
                        }
                        remaining = task.update(topicPartition, records.records(topicPartition));
                        if (remaining.isEmpty()) continue;
                        this.restoreConsumer.pause(Collections.singleton(topicPartition));
                        this.standbyRecords.put(topicPartition, remaining);
                    }
                }
            }
            catch (InvalidOffsetException recoverableException) {
                this.log.warn("Updating StandbyTasks failed. Deleting StandbyTasks stores to recreate from scratch.", (Throwable)recoverableException);
                Set partitions = recoverableException.partitions();
                for (TopicPartition partition : partitions) {
                    StandbyTask task = this.taskManager.standbyTask(partition);
                    if (task.isClosed()) {
                        this.log.info("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. Notifying the thread to trigger a new rebalance immediately.", (Object)task.id());
                        throw new TaskMigratedException(task);
                    }
                    this.log.info("Reinitializing StandbyTask {} from changelogs {}", (Object)task, (Object)recoverableException.partitions());
                    task.reinitializeStateStoresForPartitions(recoverableException.partitions());
                }
                this.restoreConsumer.seekToBeginning((Collection)partitions);
            }
            this.advanceNowAndComputeLatency();
        }
    }

    private long advanceNowAndComputeLatency() {
        long previous = this.now;
        this.now = this.time.milliseconds();
        return Math.max(this.now - previous, 0L);
    }

    public void shutdown() {
        this.log.info("Informed to shut down");
        State oldState = this.setState(State.PENDING_SHUTDOWN);
        if (oldState == State.CREATED) {
            this.completeShutdown(true);
        }
    }

    private void completeShutdown(boolean cleanRun) {
        this.setState(State.PENDING_SHUTDOWN);
        this.log.info("Shutting down");
        try {
            this.taskManager.shutdown(cleanRun);
        }
        catch (Throwable e) {
            this.log.error("Failed to close task manager due to the following error:", e);
        }
        try {
            this.consumer.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close consumer due to the following error:", e);
        }
        try {
            this.restoreConsumer.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close restore consumer due to the following error:", e);
        }
        this.streamsMetrics.removeAllThreadLevelSensors();
        this.setState(State.DEAD);
        this.log.info("Shutdown complete");
    }

    private void clearStandbyRecords() {
        this.standbyRecords.clear();
    }

    public final ThreadMetadata threadMetadata() {
        return this.threadMetadata;
    }

    StreamThread updateThreadMetadata(String adminClientId) {
        this.threadMetadata = new ThreadMetadata(this.getName(), this.state().name(), StreamThread.getConsumerClientId(this.getName()), StreamThread.getRestoreConsumerClientId(this.getName()), this.producer == null ? Collections.emptySet() : Collections.singleton(StreamThread.getThreadProducerClientId(this.getName())), adminClientId, Collections.emptySet(), Collections.emptySet());
        return this;
    }

    private void updateThreadMetadata(Map<TaskId, StreamTask> activeTasks, Map<TaskId, StandbyTask> standbyTasks) {
        HashSet<String> producerClientIds = new HashSet<String>();
        HashSet<TaskMetadata> activeTasksMetadata = new HashSet<TaskMetadata>();
        for (Map.Entry<TaskId, StreamTask> entry : activeTasks.entrySet()) {
            activeTasksMetadata.add(new TaskMetadata(entry.getKey().toString(), entry.getValue().partitions()));
            producerClientIds.add(StreamThread.getTaskProducerClientId(this.getName(), entry.getKey()));
        }
        HashSet<TaskMetadata> standbyTasksMetadata = new HashSet<TaskMetadata>();
        for (Map.Entry<TaskId, StandbyTask> entry : standbyTasks.entrySet()) {
            standbyTasksMetadata.add(new TaskMetadata(entry.getKey().toString(), entry.getValue().partitions()));
        }
        String string = this.threadMetadata.adminClientId();
        this.threadMetadata = new ThreadMetadata(this.getName(), this.state().name(), StreamThread.getConsumerClientId(this.getName()), StreamThread.getRestoreConsumerClientId(this.getName()), this.producer == null ? producerClientIds : Collections.singleton(StreamThread.getThreadProducerClientId(this.getName())), string, activeTasksMetadata, standbyTasksMetadata);
    }

    public Map<TaskId, StreamTask> tasks() {
        return this.taskManager.activeTasks();
    }

    @Override
    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        return indent + "\tStreamsThread threadId: " + this.getName() + "\n" + this.taskManager.toString(indent);
    }

    public Map<MetricName, Metric> producerMetrics() {
        LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<MetricName, Metric>();
        if (this.producer != null) {
            Map producerMetrics = this.producer.metrics();
            if (producerMetrics != null) {
                result.putAll(producerMetrics);
            }
        } else {
            for (StreamTask task : this.taskManager.activeTasks().values()) {
                Map taskProducerMetrics = task.getProducer().metrics();
                result.putAll(taskProducerMetrics);
            }
        }
        return result;
    }

    public Map<MetricName, Metric> consumerMetrics() {
        Map consumerMetrics = this.consumer.metrics();
        Map restoreConsumerMetrics = this.restoreConsumer.metrics();
        LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<MetricName, Metric>();
        result.putAll(consumerMetrics);
        result.putAll(restoreConsumerMetrics);
        return result;
    }

    public Map<MetricName, Metric> adminClientMetrics() {
        Map adminClientMetrics = this.taskManager.getAdminClient().metrics();
        LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<MetricName, Metric>();
        result.putAll(adminClientMetrics);
        return result;
    }

    void setNow(long now) {
        this.now = now;
    }

    TaskManager taskManager() {
        return this.taskManager;
    }

    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() {
        return this.standbyRecords;
    }

    int currentNumIterations() {
        return this.numIterations;
    }

    public StateListener stateListener() {
        return this.stateListener;
    }

    private static final class InternalConsumerConfig
    extends ConsumerConfig {
        private InternalConsumerConfig(Map<String, Object> props) {
            super(ConsumerConfig.addDeserializerToConfig(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()), false);
        }
    }

    static class StreamsMetricsThreadImpl
    extends StreamsMetricsImpl {
        private final Sensor commitTimeSensor;
        private final Sensor pollTimeSensor;
        private final Sensor processTimeSensor;
        private final Sensor punctuateTimeSensor;
        private final Sensor taskCreatedSensor;
        private final Sensor taskClosedSensor;

        StreamsMetricsThreadImpl(Metrics metrics, String threadName) {
            super(metrics, threadName);
            String group = "stream-metrics";
            this.commitTimeSensor = this.threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO, new Sensor[0]);
            StreamsMetricsThreadImpl.addAvgMaxLatency(this.commitTimeSensor, "stream-metrics", this.tagMap(new String[0]), "commit");
            StreamsMetricsThreadImpl.addInvocationRateAndCount(this.commitTimeSensor, "stream-metrics", this.tagMap(new String[0]), "commit");
            this.pollTimeSensor = this.threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO, new Sensor[0]);
            StreamsMetricsThreadImpl.addAvgMaxLatency(this.pollTimeSensor, "stream-metrics", this.tagMap(new String[0]), "poll");
            this.pollTimeSensor.add(metrics.metricName("poll-rate", "stream-metrics", "The average per-second number of record-poll calls", this.tagMap(new String[0])), (MeasurableStat)new Rate(TimeUnit.SECONDS, (SampledStat)new Count()));
            this.pollTimeSensor.add(metrics.metricName("poll-total", "stream-metrics", "The total number of record-poll calls", this.tagMap(new String[0])), (MeasurableStat)new CumulativeCount());
            this.processTimeSensor = this.threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO, new Sensor[0]);
            StreamsMetricsThreadImpl.addAvgMaxLatency(this.processTimeSensor, "stream-metrics", this.tagMap(new String[0]), "process");
            StreamsMetricsThreadImpl.addInvocationRateAndCount(this.processTimeSensor, "stream-metrics", this.tagMap(new String[0]), "process");
            this.punctuateTimeSensor = this.threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO, new Sensor[0]);
            StreamsMetricsThreadImpl.addAvgMaxLatency(this.punctuateTimeSensor, "stream-metrics", this.tagMap(new String[0]), "punctuate");
            StreamsMetricsThreadImpl.addInvocationRateAndCount(this.punctuateTimeSensor, "stream-metrics", this.tagMap(new String[0]), "punctuate");
            this.taskCreatedSensor = this.threadLevelSensor("task-created", Sensor.RecordingLevel.INFO, new Sensor[0]);
            this.taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", this.tagMap(new String[0])), (MeasurableStat)new Rate(TimeUnit.SECONDS, (SampledStat)new Count()));
            this.taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", this.tagMap(new String[0])), (MeasurableStat)new Total());
            this.taskClosedSensor = this.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]);
            this.taskClosedSensor.add(metrics.metricName("task-closed-rate", "stream-metrics", "The average per-second number of closed tasks", this.tagMap(new String[0])), (MeasurableStat)new Rate(TimeUnit.SECONDS, (SampledStat)new Count()));
            this.taskClosedSensor.add(metrics.metricName("task-closed-total", "stream-metrics", "The total number of closed tasks", this.tagMap(new String[0])), (MeasurableStat)new Total());
        }
    }

    static class StandbyTaskCreator
    extends AbstractTaskCreator<StandbyTask> {
        StandbyTaskCreator(InternalTopologyBuilder builder, StreamsConfig config, StreamsMetricsThreadImpl streamsMetrics, StateDirectory stateDirectory, ChangelogReader storeChangelogReader, Time time, Logger log) {
            super(builder, config, streamsMetrics, stateDirectory, storeChangelogReader, time, log);
        }

        @Override
        StandbyTask createTask(Consumer<byte[], byte[]> consumer, TaskId taskId, Set<TopicPartition> partitions) {
            this.streamsMetrics.taskCreatedSensor.record();
            ProcessorTopology topology = this.builder.build(taskId.topicGroupId);
            if (!topology.stateStores().isEmpty()) {
                return new StandbyTask(taskId, partitions, topology, consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory);
            }
            this.log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", (Object)taskId, partitions);
            return null;
        }
    }

    static class TaskCreator
    extends AbstractTaskCreator<StreamTask> {
        private final ThreadCache cache;
        private final KafkaClientSupplier clientSupplier;
        private final String threadClientId;
        private final Producer<byte[], byte[]> threadProducer;

        TaskCreator(InternalTopologyBuilder builder, StreamsConfig config, StreamsMetricsThreadImpl streamsMetrics, StateDirectory stateDirectory, ChangelogReader storeChangelogReader, ThreadCache cache, Time time, KafkaClientSupplier clientSupplier, Producer<byte[], byte[]> threadProducer, String threadClientId, Logger log) {
            super(builder, config, streamsMetrics, stateDirectory, storeChangelogReader, time, log);
            this.cache = cache;
            this.clientSupplier = clientSupplier;
            this.threadProducer = threadProducer;
            this.threadClientId = threadClientId;
        }

        @Override
        StreamTask createTask(Consumer<byte[], byte[]> consumer, TaskId taskId, Set<TopicPartition> partitions) {
            this.streamsMetrics.taskCreatedSensor.record();
            return new StreamTask(taskId, partitions, this.builder.build(taskId.topicGroupId), consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory, this.cache, this.time, () -> this.createProducer(taskId), this.streamsMetrics.taskClosedSensor);
        }

        private Producer<byte[], byte[]> createProducer(TaskId id) {
            if (this.threadProducer == null) {
                Map<String, Object> producerConfigs = this.config.getProducerConfigs(StreamThread.getTaskProducerClientId(this.threadClientId, id));
                this.log.info("Creating producer client for task {}", (Object)id);
                producerConfigs.put("transactional.id", this.applicationId + "-" + id);
                return this.clientSupplier.getProducer(producerConfigs);
            }
            return this.threadProducer;
        }

        @Override
        public void close() {
            if (this.threadProducer != null) {
                try {
                    this.threadProducer.close();
                }
                catch (Throwable e) {
                    this.log.error("Failed to close producer due to the following error:", e);
                }
            }
        }
    }

    static abstract class AbstractTaskCreator<T extends Task> {
        final String applicationId;
        final InternalTopologyBuilder builder;
        final StreamsConfig config;
        final StreamsMetricsThreadImpl streamsMetrics;
        final StateDirectory stateDirectory;
        final ChangelogReader storeChangelogReader;
        final Time time;
        final Logger log;

        AbstractTaskCreator(InternalTopologyBuilder builder, StreamsConfig config, StreamsMetricsThreadImpl streamsMetrics, StateDirectory stateDirectory, ChangelogReader storeChangelogReader, Time time, Logger log) {
            this.applicationId = config.getString("application.id");
            this.builder = builder;
            this.config = config;
            this.streamsMetrics = streamsMetrics;
            this.stateDirectory = stateDirectory;
            this.storeChangelogReader = storeChangelogReader;
            this.time = time;
            this.log = log;
        }

        public InternalTopologyBuilder builder() {
            return this.builder;
        }

        public StateDirectory stateDirectory() {
            return this.stateDirectory;
        }

        Collection<T> createTasks(Consumer<byte[], byte[]> consumer, Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
            ArrayList<T> createdTasks = new ArrayList<T>();
            for (Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
                Set<TopicPartition> partitions;
                TaskId taskId = newTaskAndPartitions.getKey();
                T task = this.createTask(consumer, taskId, partitions = newTaskAndPartitions.getValue());
                if (task == null) continue;
                this.log.trace("Created task {} with assigned partitions {}", (Object)taskId, partitions);
                createdTasks.add(task);
            }
            return createdTasks;
        }

        abstract T createTask(Consumer<byte[], byte[]> var1, TaskId var2, Set<TopicPartition> var3);

        public void close() {
        }
    }

    static class RebalanceListener
    implements ConsumerRebalanceListener {
        private final Time time;
        private final TaskManager taskManager;
        private final StreamThread streamThread;
        private final Logger log;

        RebalanceListener(Time time, TaskManager taskManager, StreamThread streamThread, Logger log) {
            this.time = time;
            this.taskManager = taskManager;
            this.streamThread = streamThread;
            this.log = log;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
            long start;
            block7: {
                block6: {
                    this.log.debug("at state {}: partitions {} assigned at the end of consumer rebalance.\n\tcurrent suspended active tasks: {}\n\tcurrent suspended standby tasks: {}\n", new Object[]{this.streamThread.state, assignment, this.taskManager.suspendedActiveTaskIds(), this.taskManager.suspendedStandbyTaskIds()});
                    if (this.streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
                        this.log.error("Received error code {} - shutdown", (Object)this.streamThread.assignmentErrorCode.get());
                        this.streamThread.shutdown();
                        this.streamThread.setStateListener(null);
                        return;
                    }
                    start = this.time.milliseconds();
                    if (this.streamThread.setState(State.PARTITIONS_ASSIGNED) != null) break block6;
                    this.log.info("partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.prevActiveTaskIds()});
                    return;
                }
                try {
                    if (this.streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) break block7;
                    this.taskManager.createTasks(assignment);
                }
                catch (Throwable t) {
                    try {
                        this.log.error("Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance: {}", t);
                        this.streamThread.setRebalanceException(t);
                    }
                    catch (Throwable throwable) {
                        this.log.info("partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.prevActiveTaskIds()});
                        throw throwable;
                    }
                    this.log.info("partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.prevActiveTaskIds()});
                }
            }
            this.log.info("partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{this.time.milliseconds() - start, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), this.taskManager.prevActiveTaskIds()});
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
            this.log.debug("at state {}: partitions {} revoked at the beginning of consumer rebalance.\n\tcurrent assigned active tasks: {}\n\tcurrent assigned standby tasks: {}\n", new Object[]{this.streamThread.state, assignment, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds()});
            if (this.streamThread.setState(State.PARTITIONS_REVOKED) != null) {
                long start;
                block6: {
                    start = this.time.milliseconds();
                    try {
                        if (this.streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.VERSION_PROBING.code()) {
                            this.streamThread.assignmentErrorCode.set(StreamsPartitionAssignor.Error.NONE.code());
                            break block6;
                        }
                        this.taskManager.suspendTasksAndState();
                    }
                    catch (Throwable t) {
                        try {
                            this.log.error("Error caught during partition revocation, will abort the current process and re-throw at the end of rebalance: {}", t);
                            this.streamThread.setRebalanceException(t);
                        }
                        catch (Throwable throwable) {
                            this.streamThread.clearStandbyRecords();
                            this.log.info("partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}", new Object[]{this.time.milliseconds() - start, this.taskManager.suspendedActiveTaskIds(), this.taskManager.suspendedStandbyTaskIds()});
                            throw throwable;
                        }
                        this.streamThread.clearStandbyRecords();
                        this.log.info("partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}", new Object[]{this.time.milliseconds() - start, this.taskManager.suspendedActiveTaskIds(), this.taskManager.suspendedStandbyTaskIds()});
                    }
                }
                this.streamThread.clearStandbyRecords();
                this.log.info("partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}", new Object[]{this.time.milliseconds() - start, this.taskManager.suspendedActiveTaskIds(), this.taskManager.suspendedStandbyTaskIds()});
            }
        }
    }

    public static interface StateListener {
        public void onChange(Thread var1, ThreadStateTransitionValidator var2, ThreadStateTransitionValidator var3);
    }

    public static enum State implements ThreadStateTransitionValidator
    {
        CREATED(1, 5),
        STARTING(2, 5),
        PARTITIONS_REVOKED(3, 5),
        PARTITIONS_ASSIGNED(2, 4, 5),
        RUNNING(2, 5),
        PENDING_SHUTDOWN(6),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            return this.equals(RUNNING) || this.equals(STARTING) || this.equals(PARTITIONS_REVOKED) || this.equals(PARTITIONS_ASSIGNED);
        }

        @Override
        public boolean isValidTransition(ThreadStateTransitionValidator newState) {
            State tmpState = (State)newState;
            return this.validTransitions.contains(tmpState.ordinal());
        }
    }
}

