/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.kafka;

import com.google.common.base.Preconditions;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.kafkaesqueesque.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafkaesqueesque.clients.consumer.OffsetAndMetadata;
import org.apache.kafkaesqueesque.clients.producer.Callback;
import org.apache.kafkaesqueesque.clients.producer.KafkaProducer;
import org.apache.kafkaesqueesque.clients.producer.Producer;
import org.apache.kafkaesqueesque.clients.producer.ProducerRecord;
import org.apache.kafkaesqueesque.clients.producer.RecordMetadata;
import org.apache.kafkaesqueesque.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafkaesqueesque.common.Metric;
import org.apache.kafkaesqueesque.common.MetricName;
import org.apache.kafkaesqueesque.common.PartitionInfo;
import org.apache.kafkaesqueesque.common.TopicPartition;
import org.apache.kafkaesqueesque.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HiveKafkaProducer<K, V>
implements Producer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducer.class);
    private final KafkaProducer<K, V> kafkaProducer;
    @Nullable
    private final String transactionalId;

    HiveKafkaProducer(Properties properties) {
        this.transactionalId = properties.getProperty("transactional.id");
        this.kafkaProducer = new KafkaProducer(properties);
    }

    @Override
    public void initTransactions() {
        this.kafkaProducer.initTransactions();
    }

    @Override
    public void beginTransaction() throws ProducerFencedException {
        this.kafkaProducer.beginTransaction();
    }

    @Override
    public void commitTransaction() throws ProducerFencedException {
        this.kafkaProducer.commitTransaction();
    }

    @Override
    public void abortTransaction() throws ProducerFencedException {
        this.kafkaProducer.abortTransaction();
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        this.kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
        this.kafkaProducer.sendOffsetsToTransaction(offsets, groupMetadata);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.kafkaProducer.send(record);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        return this.kafkaProducer.send(record, callback);
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this.kafkaProducer.partitionsFor(topic);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return this.kafkaProducer.metrics();
    }

    @Override
    public void close() {
        this.kafkaProducer.close();
    }

    @Override
    public void close(Duration duration) {
        this.kafkaProducer.close(duration);
    }

    @Override
    public void flush() {
        this.kafkaProducer.flush();
        if (this.transactionalId != null) {
            this.flushNewPartitions();
        }
    }

    synchronized void resumeTransaction(long producerId, short epoch) {
        Preconditions.checkState((producerId >= 0L && epoch >= 0 ? 1 : 0) != 0, (String)"Incorrect values for producerId {} and epoch {}", (long)producerId, (int)epoch);
        LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", new Object[]{this.transactionalId, producerId, epoch});
        Object transactionManager = HiveKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Object topicPartitionBookkeeper = HiveKafkaProducer.getValue(transactionManager, "topicPartitionBookkeeper");
        HiveKafkaProducer.invoke(transactionManager, "transitionTo", HiveKafkaProducer.getEnum("org.apache.kafkaesqueesque.clients.producer.internals.TransactionManager$State.INITIALIZING"));
        HiveKafkaProducer.invoke(topicPartitionBookkeeper, "reset", new Object[0]);
        Object producerIdAndEpoch = HiveKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
        HiveKafkaProducer.setValue(producerIdAndEpoch, "producerId", producerId);
        HiveKafkaProducer.setValue(producerIdAndEpoch, "epoch", epoch);
        HiveKafkaProducer.invoke(transactionManager, "transitionTo", HiveKafkaProducer.getEnum("org.apache.kafkaesqueesque.clients.producer.internals.TransactionManager$State.READY"));
        HiveKafkaProducer.invoke(transactionManager, "transitionTo", HiveKafkaProducer.getEnum("org.apache.kafkaesqueesque.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
        HiveKafkaProducer.setValue(transactionManager, "transactionStarted", true);
    }

    @Nullable
    String getTransactionalId() {
        return this.transactionalId;
    }

    long getProducerId() {
        Object transactionManager = HiveKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Object producerIdAndEpoch = HiveKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
        return (Long)HiveKafkaProducer.getValue(producerIdAndEpoch, "producerId");
    }

    short getEpoch() {
        Object transactionManager = HiveKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Object producerIdAndEpoch = HiveKafkaProducer.getValue(transactionManager, "producerIdAndEpoch");
        return (Short)HiveKafkaProducer.getValue(producerIdAndEpoch, "epoch");
    }

    private void flushNewPartitions() {
        LOG.info("Flushing new partitions");
        TransactionalRequestResult result = this.enqueueNewPartitions();
        Object sender = HiveKafkaProducer.getValue(this.kafkaProducer, "sender");
        HiveKafkaProducer.invoke(sender, "wakeup", new Object[0]);
        result.await();
    }

    private synchronized TransactionalRequestResult enqueueNewPartitions() {
        Object transactionManager = HiveKafkaProducer.getValue(this.kafkaProducer, "transactionManager");
        Object txnRequestHandler = HiveKafkaProducer.invoke(transactionManager, "addPartitionsToTransactionHandler", new Object[0]);
        HiveKafkaProducer.invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
        return (TransactionalRequestResult)HiveKafkaProducer.getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
    }

    private static Enum<?> getEnum(String enumFullName) {
        String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
        if (x.length == 2) {
            String enumClassName = x[0];
            String enumName = x[1];
            try {
                Class<?> cl = Class.forName(enumClassName);
                return Enum.valueOf(cl, enumName);
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException("Incompatible KafkaProducer version", e);
            }
        }
        return null;
    }

    private static Object invoke(Object object, String methodName, Object ... args) {
        Class[] argTypes = new Class[args.length];
        for (int i = 0; i < args.length; ++i) {
            argTypes[i] = args[i].getClass();
        }
        return HiveKafkaProducer.invoke(object, methodName, argTypes, args);
    }

    private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
        try {
            Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
            method.setAccessible(true);
            return method.invoke(object, args);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static Object getValue(Object object, String fieldName) {
        return HiveKafkaProducer.getValue(object, object.getClass(), fieldName);
    }

    private static Object getValue(Object object, Class<?> clazz, String fieldName) {
        try {
            Field field = clazz.getDeclaredField(fieldName);
            field.setAccessible(true);
            return field.get(object);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static void setValue(Object object, String fieldName, Object value) {
        try {
            Field field = object.getClass().getDeclaredField(fieldName);
            field.setAccessible(true);
            field.set(object, value);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }
}

