package io.prestodb.tempto.fulfillment.table.kafka;

import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.name.Names;
import io.prestodb.tempto.fulfillment.table.MutableTableRequirement;
import io.prestodb.tempto.fulfillment.table.TableDefinition;
import io.prestodb.tempto.fulfillment.table.TableHandle;
import io.prestodb.tempto.fulfillment.table.TableInstance;
import io.prestodb.tempto.fulfillment.table.TableManager;
import io.prestodb.tempto.internal.fulfillment.table.TableName;
import io.prestodb.tempto.query.QueryExecutor;
import java.lang.annotation.Annotation;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Consumer;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode$Disabled$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.jetty.io.SelectorManager;

@Singleton
@TableManager.Descriptor(tableDefinitionClass = KafkaTableDefinition.class, type = "KAFKA")
/* loaded from: input_file:io/prestodb/tempto/fulfillment/table/kafka/KafkaTableManager.class */
public class KafkaTableManager implements TableManager<KafkaTableDefinition> {
    private final String databaseName;
    private final QueryExecutor prestoQueryExecutor;
    private final String brokerHost;
    private final Integer brokerPort;
    private final String prestoKafkaCatalog;
    private final String zookeeperHost;
    private final Integer zookeeperPort;

    @Inject
    public KafkaTableManager(@Named("databaseName") String str, @Named("broker.host") String str2, @Named("broker.port") int i, @Named("zookeeper.host") String str3, @Named("zookeeper.port") int i2, @Named("presto_database_name") String str4, @Named("presto_kafka_catalog") String str5, Injector injector) {
        this.databaseName = (String) Objects.requireNonNull(str, "databaseName is null");
        this.brokerHost = (String) Objects.requireNonNull(str2, "brokerHost is null");
        this.brokerPort = Integer.valueOf(i);
        this.zookeeperHost = (String) Objects.requireNonNull(str3, "zookeeperHost is null");
        this.zookeeperPort = Integer.valueOf(i2);
        Objects.requireNonNull(injector, "injector is null");
        Objects.requireNonNull(str4, "prestoDatabaseName is null");
        this.prestoQueryExecutor = (QueryExecutor) injector.getInstance(Key.get(QueryExecutor.class, (Annotation) Names.named(str4)));
        this.prestoKafkaCatalog = (String) Objects.requireNonNull(str5, "prestoKafkaCatalog is null");
    }

    @Override // io.prestodb.tempto.fulfillment.table.TableManager
    public TableInstance<KafkaTableDefinition> createImmutable(KafkaTableDefinition kafkaTableDefinition, TableHandle tableHandle) {
        verifyTableExistsInPresto(tableHandle.getSchema().orElseThrow(() -> {
            return new IllegalArgumentException("Schema required for Kafka tables");
        }), tableHandle.getName());
        deleteTopic(kafkaTableDefinition.getTopic());
        createTopic(kafkaTableDefinition.getTopic(), kafkaTableDefinition.getPartitionsCount(), kafkaTableDefinition.getReplicationLevel());
        insertDataIntoTopic(kafkaTableDefinition.getTopic(), kafkaTableDefinition.getDataSource());
        return new KafkaTableInstance(new TableName(tableHandle.getDatabase().orElse(getDatabaseName()), tableHandle.getSchema(), tableHandle.getName(), tableHandle.getName()), kafkaTableDefinition);
    }

    private void verifyTableExistsInPresto(String str, String str2) {
        if (((Long) this.prestoQueryExecutor.executeQuery(String.format("select count(1) from %s.information_schema.tables where table_schema='%s' and table_name='%s'", this.prestoKafkaCatalog, str, str2), new QueryExecutor.QueryParam[0]).row(0).get(0)).longValue() != 1) {
            throw new RuntimeException(String.format("Table %s.%s not defined if kafka catalog (%s)", str, str2, this.prestoKafkaCatalog));
        }
    }

    private void deleteTopic(String str) {
        withZookeeper(zkUtils -> {
            if (AdminUtils.topicExists(zkUtils, str)) {
                AdminUtils.deleteTopic(zkUtils, str);
                for (int i = 0; i < 5; i++) {
                    if (!AdminUtils.topicExists(zkUtils, str)) {
                        return;
                    }
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException("could not delete topic " + str);
                    }
                }
                throw new RuntimeException("could not delete topic " + str);
            }
        });
    }

    private void createTopic(String str, int i, int i2) {
        withZookeeper(zkUtils -> {
            AdminUtils.createTopic(zkUtils, str, i, i2, new Properties(), RackAwareMode$Disabled$.MODULE$);
        });
    }

    private void insertDataIntoTopic(String str, KafkaDataSource kafkaDataSource) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerHost + ":" + this.brokerPort);
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put("retries", 0);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Iterator<KafkaMessage> messages = kafkaDataSource.getMessages();
        while (messages.hasNext()) {
            KafkaMessage next = messages.next();
            try {
                kafkaProducer.send(new ProducerRecord(str, next.getPartition().isPresent() ? Integer.valueOf(next.getPartition().getAsInt()) : null, next.getKey().orElse(null), next.getValue())).get();
            } catch (Exception e) {
                throw new RuntimeException("could not send message to topic " + str);
            }
        }
    }

    private void withZookeeper(Consumer<ZkUtils> consumer) {
        String str = this.zookeeperHost + ":" + this.zookeeperPort;
        ZkClient zkClient = new ZkClient(str, SelectorManager.DEFAULT_CONNECT_TIMEOUT, 10000, ZKStringSerializer$.MODULE$);
        try {
            consumer.accept(new ZkUtils(zkClient, new ZkConnection(str), false));
            zkClient.close();
        } catch (Throwable th) {
            zkClient.close();
            throw th;
        }
    }

    @Override // io.prestodb.tempto.fulfillment.table.TableManager
    public TableInstance<KafkaTableDefinition> createMutable(KafkaTableDefinition kafkaTableDefinition, MutableTableRequirement.State state, TableHandle tableHandle) {
        throw new IllegalArgumentException("Mutable tables are not supported by KafkaTableManager");
    }

    @Override // io.prestodb.tempto.fulfillment.table.TableManager
    public void dropTable(TableName tableName) {
        throw new IllegalArgumentException("dropTable not supported by KafkaTableManager");
    }

    @Override // io.prestodb.tempto.fulfillment.table.TableManager
    public void dropStaleMutableTables() {
    }

    @Override // io.prestodb.tempto.fulfillment.table.TableManager
    public String getDatabaseName() {
        return this.databaseName;
    }

    @Override // io.prestodb.tempto.fulfillment.table.TableManager
    public Class<? extends TableDefinition> getTableDefinitionClass() {
        return KafkaTableDefinition.class;
    }
}
