/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.hadoop.hive.kafka.KafkaOutputFormat;
import org.apache.hadoop.hive.kafka.KafkaUtils;
import org.apache.hadoop.hive.kafka.KafkaWritable;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.kafkaesqueesque.clients.producer.Callback;
import org.apache.kafkaesqueesque.clients.producer.KafkaProducer;
import org.apache.kafkaesqueesque.common.KafkaException;
import org.apache.kafkaesqueesque.common.errors.TimeoutException;
import org.apache.kafkaesqueesque.common.serialization.ByteArraySerializer;
import org.apache.kafkaesqueesque.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SimpleKafkaWriter
implements FileSinkOperator.RecordWriter,
RecordWriter<BytesWritable, KafkaWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaWriter.class);
    private static final String TIMEOUT_CONFIG_HINT = "Try increasing producer property [`retries`] and [`retry.backoff.ms`] to avoid this error [{}].";
    private static final String ABORT_MSG = "Writer [%s] aborting Send. Caused by [%s]. Sending to topic [%s]. Record offset [%s];";
    private static final String ACTION_ABORT = "WriterId [{}] lost record from Topic [{}], delivery Semantic [{}] -> ACTION=ABORT, ERROR caused by [{}]";
    private final String topic;
    private final String writerId;
    private final KafkaOutputFormat.WriteSemantic writeSemantic = KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE;
    private final KafkaProducer<byte[], byte[]> producer;
    private final Callback callback;
    private final AtomicReference<Exception> sendExceptionRef = new AtomicReference();
    private final AtomicLong lostRecords = new AtomicLong(0L);
    private long sentRecords = 0L;

    SimpleKafkaWriter(String topic, @Nullable String writerId, Properties properties) {
        this.writerId = writerId == null ? UUID.randomUUID().toString() : writerId;
        this.topic = (String)Preconditions.checkNotNull((Object)topic, (Object)"Topic can not be null");
        Preconditions.checkState((properties.getProperty("bootstrap.servers") != null ? 1 : 0) != 0, (Object)"set [bootstrap.servers] property");
        this.producer = new KafkaProducer<byte[], byte[]>(properties, (Serializer<byte[]>)new ByteArraySerializer(), (Serializer<byte[]>)new ByteArraySerializer());
        this.callback = (metadata, exception) -> {
            if (exception != null) {
                this.lostRecords.getAndIncrement();
                LOG.error(ACTION_ABORT, new Object[]{this.getWriterId(), topic, this.writeSemantic, exception.getMessage()});
                this.sendExceptionRef.compareAndSet(null, exception);
            }
        };
        LOG.info("Starting WriterId [{}], Delivery Semantic [{}], Target Kafka Topic [{}]", new Object[]{writerId, this.writeSemantic, topic});
    }

    public void write(Writable w) throws IOException {
        this.checkExceptions();
        try {
            ++this.sentRecords;
            this.producer.send(KafkaUtils.toProducerRecord(this.topic, (KafkaWritable)w), this.callback);
        }
        catch (KafkaException kafkaException) {
            this.handleKafkaException(kafkaException);
            this.checkExceptions();
        }
    }

    private void handleKafkaException(KafkaException kafkaException) {
        if (kafkaException instanceof TimeoutException) {
            LOG.error(TIMEOUT_CONFIG_HINT, (Object)kafkaException.getMessage());
        }
        if (KafkaUtils.exceptionIsFatal(kafkaException)) {
            LOG.error(String.format(ABORT_MSG, this.writerId, kafkaException.getMessage(), this.topic, -1L));
            this.sendExceptionRef.compareAndSet(null, kafkaException);
        } else {
            LOG.error(ACTION_ABORT, new Object[]{this.writerId, this.topic, this.writeSemantic, kafkaException.getMessage()});
            this.sendExceptionRef.compareAndSet(null, kafkaException);
        }
    }

    public void close(boolean abort) throws IOException {
        if (abort) {
            LOG.info("Aborting is set to TRUE, Closing writerId [{}] without flush.", (Object)this.writerId);
            this.producer.close(Duration.ofMillis(0L));
            return;
        }
        LOG.info("Flushing Kafka Producer with writerId [{}]", (Object)this.writerId);
        this.producer.flush();
        LOG.info("Closing WriterId [{}]", (Object)this.writerId);
        this.producer.close();
        LOG.info("Closed WriterId [{}] Delivery semantic [{}], Topic[{}], Total sent Records [{}], Total Lost Records [{}]", new Object[]{this.writerId, this.writeSemantic, this.topic, this.sentRecords, this.lostRecords.get()});
        this.checkExceptions();
    }

    @VisibleForTesting
    String getWriterId() {
        return this.writerId;
    }

    @VisibleForTesting
    long getLostRecords() {
        return this.lostRecords.get();
    }

    @VisibleForTesting
    long getSentRecords() {
        return this.sentRecords;
    }

    public void write(BytesWritable bytesWritable, KafkaWritable kafkaWritable) throws IOException {
        this.write(kafkaWritable);
    }

    public void close(Reporter reporter) throws IOException {
        this.close(false);
    }

    private void checkExceptions() throws IOException {
        if (this.sendExceptionRef.get() != null) {
            LOG.error("Send Exception Aborting write from writerId [{}]", (Object)this.writerId);
            this.producer.close(Duration.ofMillis(0L));
            throw new IOException(this.sendExceptionRef.get());
        }
    }
}

