package io.yupiik.bundlebee.operator.launcher;

import io.yupiik.bundlebee.core.kube.HttpKubeClient;
import io.yupiik.bundlebee.core.qualifier.BundleBee;
import io.yupiik.bundlebee.operator.BundlebeeOperator;
import io.yupiik.bundlebee.operator.handler.ActionHandler;
import io.yupiik.bundlebee.operator.model.Event;
import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.json.bind.Jsonb;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@ApplicationScoped
/* loaded from: input_file:io/yupiik/bundlebee/operator/launcher/OperatorLoop.class */
public class OperatorLoop {
    private final Logger logger = Logger.getLogger(getClass().getName());

    @Inject
    private HttpKubeClient client;

    @Inject
    private ActionHandler actionHandler;

    @Inject
    @BundleBee
    private Jsonb jsonb;

    @Inject
    @ConfigProperty(name = "bundlebee.operator.storage", defaultValue = "/opt/yupiik/state/bundlebee-operator")
    private String stateLocation;

    public void run(AtomicBoolean atomicBoolean) {
        this.logger.info("Started Bundlebee Operator");
        Thread thread = new Thread(() -> {
            atomicBoolean.set(false);
        }, BundlebeeOperator.class.getName() + "-shutdown");
        Runtime.getRuntime().addShutdownHook(thread);
        ExecutorService createThreadPool = createThreadPool();
        Path resolve = Path.of(this.stateLocation, new String[0]).resolve("lastResourceVersion");
        try {
            HttpClient client = this.client.getClient();
            checkPerms();
            while (atomicBoolean.get()) {
                try {
                    String readLastResourceVersion = readLastResourceVersion(resolve);
                    this.logger.info("Starting to watch alveoli (resource version=" + readLastResourceVersion + ")");
                    client.send(this.client.prepareRequest(HttpRequest.newBuilder().header("Accept", "application/json"), "/apis/bundlebee.yupiik.io/v1/namespaces/" + this.client.getNamespace() + "/alveoli?watch=true&includeUninitialized=false&allowWatchBookmarks=true&" + (readLastResourceVersion.isBlank() ? "" : "&resourceVersion=" + readLastResourceVersion)), responseInfo -> {
                        if (responseInfo.statusCode() != 200) {
                            this.logger.info(() -> {
                                return "Got watch response: " + responseInfo.statusCode() + ", skipping";
                            });
                            return HttpResponse.BodySubscribers.discarding();
                        }
                        this.logger.info(() -> {
                            return "Got watch response: " + responseInfo.statusCode() + ", starting to watch";
                        });
                        return HttpResponse.BodySubscribers.fromLineSubscriber(new Flow.Subscriber<String>() { // from class: io.yupiik.bundlebee.operator.launcher.OperatorLoop.1
                            private Flow.Subscription subscription;

                            @Override // java.util.concurrent.Flow.Subscriber
                            public void onSubscribe(Flow.Subscription subscription) {
                                if (this.subscription != null) {
                                    this.subscription.cancel();
                                }
                                this.subscription = subscription;
                                this.subscription.request(Long.MAX_VALUE);
                            }

                            @Override // java.util.concurrent.Flow.Subscriber
                            public void onNext(String str) {
                                String resourceVersion;
                                Event event = (Event) OperatorLoop.this.jsonb.fromJson(str, Event.class);
                                if ("BOOKMARK".equalsIgnoreCase(event.getType())) {
                                    if (event.getObject().getMetadata().getResourceVersion() != null) {
                                        OperatorLoop.this.syncResourceVersion(event.getObject().getMetadata().getResourceVersion(), resolve);
                                    }
                                } else {
                                    if ("ERROR".equalsIgnoreCase(event.getType())) {
                                        OperatorLoop.this.logger.log(Level.SEVERE, () -> {
                                            return "Error event: '" + str + "'";
                                        });
                                        return;
                                    }
                                    if (event.getType() != null) {
                                        try {
                                            resourceVersion = OperatorLoop.this.max(OperatorLoop.this.readLastResourceVersion(resolve), event.getObject().getMetadata().getResourceVersion());
                                        } catch (IOException e) {
                                            resourceVersion = event.getObject().getMetadata().getResourceVersion();
                                        }
                                        if (resourceVersion != null && !resourceVersion.isBlank()) {
                                            OperatorLoop.this.syncResourceVersion(resourceVersion, resolve);
                                        }
                                        createThreadPool.submit(() -> {
                                            try {
                                                OperatorLoop.this.actionHandler.onEvent(event.getType(), event.getObject());
                                            } catch (RuntimeException e2) {
                                                Logger logger = OperatorLoop.this.logger;
                                                Level level = Level.SEVERE;
                                                Objects.requireNonNull(e2);
                                                logger.log(level, e2, e2::getMessage);
                                            }
                                        });
                                    }
                                }
                            }

                            @Override // java.util.concurrent.Flow.Subscriber
                            public void onError(Throwable th) {
                                Logger logger = OperatorLoop.this.logger;
                                Level level = Level.SEVERE;
                                Objects.requireNonNull(th);
                                logger.log(level, th, th::getMessage);
                            }

                            @Override // java.util.concurrent.Flow.Subscriber
                            public void onComplete() {
                                OperatorLoop.this.logger.finest(() -> {
                                    return "Ending watching current request";
                                });
                            }
                        });
                    });
                } catch (IOException | RuntimeException e) {
                    Logger logger = this.logger;
                    Level level = Level.WARNING;
                    Objects.requireNonNull(e);
                    logger.log(level, e, e::getMessage);
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                } catch (InterruptedException e3) {
                    this.logger.info("Application interrupted");
                    Thread.currentThread().interrupt();
                }
            }
            try {
                Runtime.getRuntime().removeShutdownHook(thread);
            } catch (IllegalStateException e4) {
            }
        } finally {
            try {
                Runtime.getRuntime().removeShutdownHook(thread);
            } catch (IllegalStateException e5) {
            }
            stopPool(createThreadPool);
        }
    }

    private String readLastResourceVersion(Path path) throws IOException {
        return Files.exists(path, new LinkOption[0]) ? Files.readString(path).strip() : "";
    }

    private void syncResourceVersion(String str, Path path) {
        synchronized (path) {
            try {
                Files.writeString(path, str, new OpenOption[0]);
            } catch (IOException e) {
                Logger logger = this.logger;
                Level level = Level.WARNING;
                Objects.requireNonNull(e);
                logger.log(level, e, e::getMessage);
            }
        }
    }

    private String max(String str, String str2) {
        if (str == null) {
            return str2;
        }
        if (str2 == null) {
            return str;
        }
        try {
            return Integer.parseInt(str) - Integer.parseInt(str2) < 0 ? str2 : str;
        } catch (NumberFormatException e) {
            return str;
        }
    }

    private void checkPerms() {
        try {
            if (((HttpResponse) this.client.execute(HttpRequest.newBuilder().header("Accept", "application/json"), "/apis/bundlebee.yupiik.io/v1/namespaces/" + this.client.getNamespace() + "/alveoli?limit=1").toCompletableFuture().get()).statusCode() != 200) {
                throw new IllegalStateException("Can't call Kubernetes API to get alveoli, check your role setup.");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            throw new IllegalStateException(e2);
        }
    }

    private void stopPool(ExecutorService executorService) {
        executorService.shutdownNow();
        try {
            if (!executorService.awaitTermination(1L, TimeUnit.MINUTES)) {
                this.logger.warning("Didn't stop properly in 1mn, giving up");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private ExecutorService createThreadPool() {
        return new ForkJoinPool(Math.max(1, Runtime.getRuntime().availableProcessors()), new ForkJoinPool.ForkJoinWorkerThreadFactory() { // from class: io.yupiik.bundlebee.operator.launcher.OperatorLoop.2
            private final AtomicInteger counter = new AtomicInteger();

            /* JADX WARN: Type inference failed for: r0v3, types: [java.util.concurrent.ForkJoinWorkerThread, io.yupiik.bundlebee.operator.launcher.OperatorLoop$2$1] */
            @Override // java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
            public ForkJoinWorkerThread newThread(ForkJoinPool forkJoinPool) {
                String str = BundlebeeOperator.class.getName() + "-" + this.counter.incrementAndGet();
                ?? r0 = new ForkJoinWorkerThread(forkJoinPool) { // from class: io.yupiik.bundlebee.operator.launcher.OperatorLoop.2.1
                };
                r0.setName(str);
                r0.setContextClassLoader(BundlebeeOperator.class.getClassLoader());
                return r0;
            }
        }, (thread, th) -> {
            Logger logger = this.logger;
            Level level = Level.SEVERE;
            Objects.requireNonNull(th);
            logger.log(level, th, th::getMessage);
        }, true);
    }
}
