/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

public class TaskExecutionMetadata {
    private static final long CONSTANT_BACKOFF_MS = 5000L;
    private final boolean hasNamedTopologies;
    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap();

    public TaskExecutionMetadata(Set<String> allTopologyNames) {
        this.hasNamedTopologies = allTopologyNames.size() != 1 || !allTopologyNames.contains("__UNNAMED_TOPOLOGY__");
    }

    public boolean canProcessTask(Task task, long now) {
        String topologyName = task.id().topologyName();
        if (!this.hasNamedTopologies) {
            return true;
        }
        NamedTopologyMetadata metadata = this.topologyNameToErrorMetadata.get(topologyName);
        return metadata == null || metadata.canProcess() && metadata.canProcessTask(task, now);
    }

    public void registerTaskError(Task task, Throwable t, long now) {
        if (this.hasNamedTopologies) {
            String topologyName = task.id().topologyName();
            this.topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName)).registerTaskError(task, t, now);
        }
    }

    private class NamedTopologyMetadata {
        private final Logger log;
        private final Map<TaskId, Long> tasksToErrorTime = new ConcurrentHashMap<TaskId, Long>();

        public NamedTopologyMetadata(String topologyName) {
            LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName));
            this.log = logContext.logger(NamedTopologyMetadata.class);
        }

        public boolean canProcess() {
            return true;
        }

        public boolean canProcessTask(Task task, long now) {
            Long errorTime = this.tasksToErrorTime.get(task.id());
            if (errorTime == null) {
                return true;
            }
            if (now - errorTime > 5000L) {
                this.log.info("End backoff for task {} at t={}", (Object)task.id(), (Object)now);
                this.tasksToErrorTime.remove(task.id());
                if (this.tasksToErrorTime.isEmpty()) {
                    TaskExecutionMetadata.this.topologyNameToErrorMetadata.remove(task.id().topologyName());
                }
                return true;
            }
            this.log.debug("Skipping processing for unhealthy task {} at t={}", (Object)task.id(), (Object)now);
            return false;
        }

        public synchronized void registerTaskError(Task task, Throwable t, long now) {
            this.log.info("Begin backoff for unhealthy task {} at t={}", (Object)task.id(), (Object)now);
            this.tasksToErrorTime.put(task.id(), now);
        }
    }
}

