/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.storage.configmap;

import io.debezium.DebeziumException;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.storage.configmap.ConfigMapFormatter;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ConfigMapFluent;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Incubating
public class ConfigMapOffsetStore
implements OffsetBackingStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigMapOffsetStore.class);
    public static final String OFFSET_STORAGE_CONFIGMAP_NAME_CONFIG = "offset.storage.configmap.name";
    private String configMapName;
    private final ExecutorService executor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory((String)(this.getClass().getSimpleName() + "-%d"), (boolean)false));
    private final KubernetesClient k8sClient;
    private final ConfigMapFormatter configMapFormatter;
    private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<ByteBuffer, ByteBuffer>();
    private ConfigMap configMap;

    public ConfigMapOffsetStore() {
        this.k8sClient = new KubernetesClientBuilder().build();
        this.configMapFormatter = new ConfigMapFormatter();
    }

    public ConfigMapOffsetStore(String clientConfig) {
        this.k8sClient = new KubernetesClientBuilder().withConfig(Config.fromKubeconfig((String)clientConfig)).build();
        this.configMapFormatter = new ConfigMapFormatter();
    }

    public void configure(WorkerConfig workerConfig) {
        Configuration configuration = Configuration.from((Map)workerConfig.originalsStrings());
        this.configMapName = configuration.getString(OFFSET_STORAGE_CONFIGMAP_NAME_CONFIG);
    }

    public void start() {
        LOGGER.info("Starting ConfigMapOffsetStore with config map {}", (Object)this.configMapName);
        String currentNamespace = this.k8sClient.getConfiguration().getNamespace();
        LOGGER.debug("Trying to get config map {} from namespace {}", (Object)this.configMapName, (Object)currentNamespace);
        this.getOrCreateConfigMap(currentNamespace);
        this.load();
    }

    private void getOrCreateConfigMap(String currentNamespace) {
        try {
            this.configMap = (ConfigMap)((Resource)((NonNamespaceOperation)this.k8sClient.configMaps().inNamespace(currentNamespace)).withName(this.configMapName)).get();
            if (this.configMap == null) {
                ((Resource)((NonNamespaceOperation)this.k8sClient.configMaps().inNamespace(currentNamespace)).resource((Object)((ConfigMapBuilder)((ConfigMapFluent.MetadataNested)new ConfigMapBuilder().withNewMetadata().withName(this.configMapName)).endMetadata()).build())).create();
                this.configMap = (ConfigMap)((Resource)((NonNamespaceOperation)this.k8sClient.configMaps().inNamespace(currentNamespace)).withName(this.configMapName)).get();
            }
        }
        catch (Exception e) {
            LOGGER.error("Error while get/create config map {}", (Object)this.configMapName, (Object)e);
            throw new DebeziumException(String.format("Error while get/create config map: %s", this.configMap), (Throwable)e);
        }
    }

    public void stop() {
        this.executor.shutdown();
        this.k8sClient.close();
        LOGGER.info("Stopped ConfigMapOffsetStore");
    }

    private void load() {
        try {
            this.data.putAll(this.configMapFormatter.convertFromStorableFormat(this.configMap.getBinaryData()));
            LOGGER.info("Config map {} correctly loaded", (Object)this.configMap);
        }
        catch (Exception e) {
            throw new DebeziumException(String.format("Unable to load data from config map: %s", this.configMapName), (Throwable)e);
        }
    }

    private void save() {
        try {
            ((Resource)this.k8sClient.configMaps().withName(this.configMapName)).edit(cm -> ((ConfigMapBuilder)new ConfigMapBuilder(cm).addToBinaryData(this.configMapFormatter.convertToStorableFormat(this.data))).build());
            LOGGER.debug("Offsets correctly stored into {} config map", (Object)this.configMap);
        }
        catch (Exception e) {
            throw new DebeziumException(String.format("Unable to edit config map: %s", this.configMapName), (Throwable)e);
        }
    }

    public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
        return this.executor.submit(() -> {
            HashMap<ByteBuffer, ByteBuffer> result = new HashMap<ByteBuffer, ByteBuffer>();
            for (ByteBuffer key : keys) {
                result.put(key, this.data.get(key));
            }
            return result;
        });
    }

    public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback) {
        return this.executor.submit(() -> {
            for (Map.Entry entry : values.entrySet()) {
                if (entry.getKey() == null) continue;
                LOGGER.debug("Storing offset with key {} and value {}", (Object)this.fromByteBuffer((ByteBuffer)entry.getKey()), (Object)this.fromByteBuffer((ByteBuffer)entry.getValue()));
                this.data.put((ByteBuffer)entry.getKey(), (ByteBuffer)entry.getValue());
            }
            this.save();
            if (callback != null) {
                callback.onCompletion(null, null);
            }
            return null;
        });
    }

    public String fromByteBuffer(ByteBuffer data) {
        return data != null ? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer())) : null;
    }

    public ByteBuffer toByteBuffer(String data) {
        return data != null ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)) : null;
    }

    public Set<Map<String, Object>> connectorPartitions(String s) {
        return null;
    }
}

