package gobblin.eventhub.writer;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.Futures;
import com.microsoft.azure.servicebus.SharedAccessSignatureTokenProvider;
import gobblin.configuration.State;
import gobblin.eventhub.EventhubMetricNames;
import gobblin.instrumented.Instrumented;
import gobblin.metrics.MetricContext;
import gobblin.password.PasswordManager;
import gobblin.writer.Batch;
import gobblin.writer.BatchAsyncDataWriter;
import gobblin.writer.SyncDataWriter;
import gobblin.writer.WriteCallback;
import gobblin.writer.WriteResponse;
import gobblin.writer.WriteResponseFuture;
import gobblin.writer.WriteResponseMapper;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/eventhub/writer/EventhubDataWriter.class */
public class EventhubDataWriter implements SyncDataWriter<String>, BatchAsyncDataWriter<String> {
    private HttpClient httpclient;
    private final String namespaceName;
    private final String eventHubName;
    private final String sasKeyName;
    private final String sasKey;
    private final String targetURI;
    private final Meter bytesWritten;
    private final Meter recordsAttempted;
    private final Meter recordsSuccess;
    private final Meter recordsFailed;
    private final Timer writeTimer;
    private long postStartTimestamp;
    private long sigExpireInMinute;
    private String signature;
    private MetricContext metricContext;
    private static final Logger log = LoggerFactory.getLogger(EventhubDataWriter.class);
    private static final Logger LOG = LoggerFactory.getLogger(EventhubDataWriter.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final WriteResponseMapper<Integer> WRITE_RESPONSE_WRAPPER = new WriteResponseMapper<Integer>() { // from class: gobblin.eventhub.writer.EventhubDataWriter.1
        public WriteResponse wrap(final Integer num) {
            return new WriteResponse<Integer>() { // from class: gobblin.eventhub.writer.EventhubDataWriter.1.1
                /* renamed from: getRawResponse, reason: merged with bridge method [inline-methods] */
                public Integer m1getRawResponse() {
                    return num;
                }

                public String getStringResponse() {
                    return num.toString();
                }

                public long bytesWritten() {
                    return -1L;
                }
            };
        }
    };

    public EventhubDataWriter(Properties properties) {
        this.postStartTimestamp = 0L;
        this.sigExpireInMinute = 1L;
        this.signature = "";
        PasswordManager passwordManager = PasswordManager.getInstance(properties);
        this.namespaceName = properties.getProperty(BatchedEventhubDataWriter.EVH_NAMESPACE);
        this.eventHubName = properties.getProperty(BatchedEventhubDataWriter.EVH_HUBNAME);
        this.sasKeyName = properties.getProperty(BatchedEventhubDataWriter.EVH_SAS_KEYNAME);
        this.sasKey = passwordManager.readPassword(properties.getProperty(BatchedEventhubDataWriter.EVH_SAS_KEYVALUE));
        this.targetURI = "https://" + this.namespaceName + ".servicebus.windows.net/" + this.eventHubName + "/messages";
        this.httpclient = HttpClients.createDefault();
        this.metricContext = Instrumented.getMetricContext(new State(properties), EventhubDataWriter.class);
        this.recordsAttempted = this.metricContext.meter(EventhubMetricNames.EventhubDataWriterMetrics.RECORDS_ATTEMPTED_METER);
        this.recordsSuccess = this.metricContext.meter(EventhubMetricNames.EventhubDataWriterMetrics.RECORDS_SUCCESS_METER);
        this.recordsFailed = this.metricContext.meter(EventhubMetricNames.EventhubDataWriterMetrics.RECORDS_FAILED_METER);
        this.bytesWritten = this.metricContext.meter(EventhubMetricNames.EventhubDataWriterMetrics.BYTES_WRITTEN_METER);
        this.writeTimer = this.metricContext.timer(EventhubMetricNames.EventhubDataWriterMetrics.WRITE_TIMER);
    }

    public EventhubDataWriter(Properties properties, HttpClient httpClient) {
        this(properties);
        this.httpclient = httpClient;
    }

    public Future<WriteResponse> write(Batch<String> batch, WriteCallback writeCallback) {
        Timer.Context time = this.writeTimer.time();
        int i = 0;
        LOG.info("Dispatching batch " + batch.getId());
        this.recordsAttempted.mark(batch.getRecords().size());
        try {
            i = request(encodeBatch(batch));
            writeCallback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(Integer.valueOf(i)));
            this.bytesWritten.mark(r0.length());
            this.recordsSuccess.mark(batch.getRecords().size());
        } catch (Exception e) {
            LOG.error("Dispatching batch " + batch.getId() + " failed :" + e.toString());
            writeCallback.onFailure(e);
            this.recordsFailed.mark(batch.getRecords().size());
        }
        time.close();
        return new WriteResponseFuture(Futures.immediateFuture(Integer.valueOf(i)), WRITE_RESPONSE_WRAPPER);
    }

    public WriteResponse write(String str) throws IOException {
        this.recordsAttempted.mark();
        int request = request(encodeRecord(str));
        this.recordsSuccess.mark();
        this.bytesWritten.mark(r0.length());
        return WRITE_RESPONSE_WRAPPER.wrap(Integer.valueOf(request));
    }

    public void refreshSignature() {
        if (this.postStartTimestamp == 0 || System.nanoTime() - this.postStartTimestamp > Duration.ofMinutes(this.sigExpireInMinute).toNanos()) {
            try {
                this.signature = SharedAccessSignatureTokenProvider.generateSharedAccessSignature(this.sasKeyName, this.sasKey, this.namespaceName, Duration.ofMinutes(this.sigExpireInMinute));
                this.postStartTimestamp = System.nanoTime();
                LOG.info("Signature is refreshing: " + this.signature);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private int request(String str) throws IOException {
        refreshSignature();
        HttpPost httpPost = new HttpPost(this.targetURI);
        httpPost.setHeader("Content-type", "application/vnd.microsoft.servicebus.json");
        httpPost.setHeader("Authorization", this.signature);
        httpPost.setHeader("Host", this.namespaceName + ".servicebus.windows.net ");
        httpPost.setEntity(new StringEntity(str));
        HttpResponse execute = this.httpclient.execute(httpPost);
        StatusLine statusLine = execute.getStatusLine();
        EntityUtils.consume(execute.getEntity());
        int statusCode = statusLine.getStatusCode();
        if (statusCode == 201) {
            return statusCode;
        }
        LOG.error(new IOException(statusLine.getReasonPhrase()).toString());
        throw new IOException(statusLine.getReasonPhrase());
    }

    private String encodeBatch(Batch<String> batch) throws IOException {
        List records = batch.getRecords();
        ArrayList arrayList = new ArrayList();
        Iterator it = records.iterator();
        while (it.hasNext()) {
            arrayList.add(new EventhubRequest((String) it.next()));
        }
        return mapper.writeValueAsString(arrayList);
    }

    private String encodeRecord(String str) throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new EventhubRequest(str));
        return mapper.writeValueAsString(arrayList);
    }

    public void close() throws IOException {
        if (this.httpclient instanceof CloseableHttpClient) {
            this.httpclient.close();
        }
    }

    public void cleanup() {
    }

    public void flush() {
    }
}
