/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.kubeclient.resources;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.Informable;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KubernetesSharedInformer<T extends HasMetadata, R>
implements KubernetesSharedWatcher<R> {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private final NamespacedKubernetesClient client;
    private final SharedIndexInformer<T> sharedIndexInformer;
    private final Function<T, R> eventWrapper;
    private final ExecutorService informerExecutor;
    private final AggregatedEventHandler aggregatedEventHandler;

    public KubernetesSharedInformer(NamespacedKubernetesClient client, Informable<T> informable, Function<T, R> eventWrapper) {
        this.client = client;
        this.informerExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorThreadFactory("KubernetesClient-Informer"));
        this.aggregatedEventHandler = new AggregatedEventHandler(this.informerExecutor);
        this.sharedIndexInformer = informable.inform(this.aggregatedEventHandler, 0L);
        this.eventWrapper = eventWrapper;
    }

    @Override
    public KubernetesSharedWatcher.Watch watch(String name, FlinkKubeClient.WatchCallbackHandler<R> handler, @Nullable Executor executor) {
        return this.aggregatedEventHandler.watch(name, new WatchCallback<R>(handler, executor));
    }

    @Override
    public void close() {
        this.sharedIndexInformer.stop();
        ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.informerExecutor});
    }

    private String getResourceKey(String name) {
        return this.client.getNamespace() + "/" + name;
    }

    private static final class WatchCallback<T> {
        private final Object callbackLock = new Object();
        private final BlockingQueue<Consumer<FlinkKubeClient.WatchCallbackHandler<T>>> callbackQueue = new LinkedBlockingQueue<Consumer<FlinkKubeClient.WatchCallbackHandler<T>>>();
        private final FlinkKubeClient.WatchCallbackHandler<T> handler;
        private final Executor executor;

        private WatchCallback(FlinkKubeClient.WatchCallbackHandler<T> handler, @Nullable Executor executor) {
            this.handler = handler;
            this.executor = executor;
        }

        private void run(Consumer<FlinkKubeClient.WatchCallbackHandler<T>> handlerConsumer) {
            if (this.executor == null) {
                handlerConsumer.accept(this.handler);
                return;
            }
            Preconditions.checkState((boolean)this.callbackQueue.add(handlerConsumer), (Object)"Unable to put callback into a queue.");
            this.executor.execute(() -> {
                Object object = this.callbackLock;
                synchronized (object) {
                    ((Consumer)Preconditions.checkNotNull((Object)((Consumer)this.callbackQueue.poll()), (String)"Callback queue is empty")).accept(this.handler);
                }
            });
        }
    }

    private class EventHandler {
        private final String resourceKey;
        private final Map<String, WatchCallback<R>> callbacks = new HashMap();
        private T resource;

        private EventHandler(String resourceKey) {
            this.resourceKey = resourceKey;
            this.resource = (HasMetadata)KubernetesSharedInformer.this.sharedIndexInformer.getIndexer().getByKey(resourceKey);
        }

        private void addWatch(String id, WatchCallback<R> callback) {
            KubernetesSharedInformer.this.log.info("Starting to watch for {}, watching id:{}", (Object)this.resourceKey, (Object)id);
            this.callbacks.put(id, callback);
            if (this.resource != null) {
                List resources = this.wrapEvent(this.resource);
                callback.run(h -> h.onAdded(resources));
            }
        }

        private boolean removeWatch(String id) {
            this.callbacks.remove(id);
            KubernetesSharedInformer.this.log.info("Stopped to watch for {}, watching id:{}", (Object)this.resourceKey, (Object)id);
            return this.callbacks.isEmpty();
        }

        private void handleResourceEvent() {
            HasMetadata newResource = (HasMetadata)KubernetesSharedInformer.this.sharedIndexInformer.getIndexer().getByKey(this.resourceKey);
            Object oldResource = this.resource;
            if (newResource == null) {
                if (oldResource != null) {
                    this.onDeleted(oldResource);
                }
            } else if (oldResource == null) {
                this.onAdded(newResource);
            } else if (!oldResource.getMetadata().getResourceVersion().equals(newResource.getMetadata().getResourceVersion())) {
                this.onModified(newResource);
            }
            this.resource = newResource;
        }

        private void onAdded(T obj) {
            this.callbacks.forEach((id, callback) -> callback.run(h -> h.onAdded(this.wrapEvent(obj))));
        }

        private void onModified(T obj) {
            this.callbacks.forEach((id, callback) -> callback.run(h -> h.onModified(this.wrapEvent(obj))));
        }

        private void onDeleted(T obj) {
            this.callbacks.forEach((id, callback) -> callback.run(h -> h.onDeleted(this.wrapEvent(obj))));
        }

        private List<R> wrapEvent(T obj) {
            return Collections.singletonList(KubernetesSharedInformer.this.eventWrapper.apply(obj));
        }
    }

    private class AggregatedEventHandler
    implements ResourceEventHandler<T> {
        private final Map<String, EventHandler> handlers = new HashMap<String, EventHandler>();
        private final ExecutorService executorService;

        private AggregatedEventHandler(ExecutorService executorService) {
            this.executorService = executorService;
        }

        @Override
        public void onAdd(T obj) {
            this.executorService.execute(() -> this.findHandler(obj).ifPresent(rec$ -> ((EventHandler)rec$).handleResourceEvent()));
        }

        @Override
        public void onUpdate(T oldObj, T newObj) {
            this.executorService.execute(() -> this.findHandler(newObj).ifPresent(rec$ -> ((EventHandler)rec$).handleResourceEvent()));
        }

        @Override
        public void onDelete(T obj, boolean deletedFinalStateUnknown) {
            this.executorService.execute(() -> this.findHandler(obj).ifPresent(rec$ -> ((EventHandler)rec$).handleResourceEvent()));
        }

        private KubernetesSharedWatcher.Watch watch(String name, WatchCallback<R> watch) {
            String resourceKey = KubernetesSharedInformer.this.getResourceKey(name);
            String watchId = UUID.randomUUID().toString();
            CompletableFuture closeFuture = new CompletableFuture();
            this.executorService.execute(() -> {
                EventHandler eventHandler = this.handlers.computeIfAbsent(resourceKey, key -> new EventHandler(resourceKey));
                eventHandler.addWatch(watchId, watch);
            });
            closeFuture.whenCompleteAsync((ignored, error) -> {
                boolean removeHandler;
                if (error != null) {
                    KubernetesSharedInformer.this.log.error("Unhandled error while closing watcher.", error);
                }
                if (removeHandler = this.handlers.get(resourceKey).removeWatch(watchId)) {
                    this.handlers.remove(resourceKey);
                }
            }, (Executor)this.executorService);
            return () -> closeFuture.complete(null);
        }

        private Optional<EventHandler> findHandler(T obj) {
            String resourceKey = KubernetesSharedInformer.this.getResourceKey(obj.getMetadata().getName());
            return Optional.ofNullable(this.handlers.get(resourceKey));
        }
    }
}

