/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaWriter<K, V>
extends DoFn<KV<K, V>, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaWriter.class);
    private final KafkaIO.Write<K, V> spec;
    private final Map<String, Object> producerConfig;
    private transient Producer<K, V> producer = null;
    private transient Exception sendException = null;
    private transient long numSendFailures = 0L;
    private final Counter elementsWritten = SinkMetrics.elementsWritten();

    @DoFn.Setup
    public void setup() {
        this.producer = this.spec.getProducerFactoryFn() != null ? (Producer)this.spec.getProducerFactoryFn().apply(this.producerConfig) : new KafkaProducer(this.producerConfig);
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext ctx) throws Exception {
        this.checkForFailures();
        KV kv = (KV)ctx.element();
        this.producer.send(new ProducerRecord(this.spec.getTopic(), kv.getKey(), kv.getValue()), (Callback)new SendCallback());
        this.elementsWritten.inc();
    }

    @DoFn.FinishBundle
    public void finishBundle() throws IOException {
        this.producer.flush();
        this.checkForFailures();
    }

    @DoFn.Teardown
    public void teardown() {
        this.producer.close();
    }

    KafkaWriter(KafkaIO.Write<K, V> spec) {
        this.spec = spec;
        this.producerConfig = new HashMap<String, Object>(spec.getProducerConfig());
        this.producerConfig.put("key.serializer", spec.getKeySerializer());
        this.producerConfig.put("value.serializer", spec.getValueSerializer());
    }

    private synchronized void checkForFailures() throws IOException {
        if (this.numSendFailures == 0L) {
            return;
        }
        String msg = String.format("KafkaWriter : failed to send %d records (since last report)", this.numSendFailures);
        Exception e = this.sendException;
        this.sendException = null;
        this.numSendFailures = 0L;
        LOG.warn(msg);
        throw new IOException(msg, e);
    }

    private class SendCallback
    implements Callback {
        private SendCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                return;
            }
            KafkaWriter kafkaWriter = KafkaWriter.this;
            synchronized (kafkaWriter) {
                if (KafkaWriter.this.sendException == null) {
                    KafkaWriter.this.sendException = exception;
                }
                KafkaWriter.this.numSendFailures++;
            }
            LOG.warn("send failed : '{}'", (Object)exception.getMessage());
        }
    }
}

