/*
 * Decompiled with CFR 0.152.
 */
package org.opentripplanner.ext.siri.updater.azure;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.google.common.io.CharStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.xml.bind.JAXBException;
import javax.xml.stream.XMLStreamException;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.http.client.utils.URIBuilder;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.ext.siri.updater.azure.AbstractAzureSiriUpdater;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureETUpdaterParameters;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureSXUpdater;
import org.opentripplanner.transit.service.TransitModel;
import org.opentripplanner.updater.trip.UpdateResult;
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;
import org.opentripplanner.util.HttpUtils;
import org.rutebanken.siri20.util.SiriXml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
import uk.org.siri.siri20.Siri;

public class SiriAzureETUpdater
extends AbstractAzureSiriUpdater {
    private static final Logger LOG = LoggerFactory.getLogger(SiriAzureSXUpdater.class);
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0L);
    private final LocalDate fromDateTime;
    private final SiriTimetableSnapshotSource snapshotSource;
    private long startTime;
    private final Consumer<UpdateResult> recordMetrics;

    public SiriAzureETUpdater(SiriAzureETUpdaterParameters config, TransitModel transitModel, SiriTimetableSnapshotSource snapshotSource) {
        super(config, transitModel);
        this.fromDateTime = config.getFromDateTime();
        this.snapshotSource = snapshotSource;
        this.recordMetrics = TripUpdateMetrics.streaming(config);
    }

    @Override
    protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) {
        ServiceBusReceivedMessage message = messageContext.getMessage();
        MESSAGE_COUNTER.incrementAndGet();
        if (MESSAGE_COUNTER.get() % 100L == 0L) {
            LOG.info("Total SIRI-ET messages received={}", (Object)MESSAGE_COUNTER.get());
        }
        this.processMessage(message.getBody().toString(), message.getMessageId());
    }

    @Override
    protected void errorConsumer(ServiceBusErrorContext errorContext) {
        this.defaultErrorConsumer(errorContext);
    }

    @Override
    protected void initializeData(String url, Consumer<ServiceBusReceivedMessageContext> consumer) throws IOException, URISyntaxException {
        if (url == null) {
            LOG.info("No history url set up for Siri Azure ET Updater");
            return;
        }
        URI uri = new URIBuilder(url).addParameter("fromDateTime", this.fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)).build();
        this.startTime = this.now();
        LOG.info("Fetching initial Siri ET data from {}, timeout is {}ms", (Object)url, (Object)this.timeout);
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Accept", "application/xml");
        long t1 = System.currentTimeMillis();
        InputStream data = HttpUtils.getData(uri, Duration.ofMillis(this.timeout), headers);
        long t2 = System.currentTimeMillis();
        if (data == null) {
            throw new IOException("Historical endpoint returned no data from url" + url);
        }
        InputStreamReader reader = new InputStreamReader(data);
        String string = CharStreams.toString((Readable)reader);
        LOG.info("Fetching initial data - finished after {} ms, got {} bytes", (Object)(t2 - t1), (Object)string.length());
        this.processHistory(string, "ET-INITIAL-1");
    }

    private void processMessage(String message, String id) {
        try {
            List<EstimatedTimetableDeliveryStructure> updates = this.getUpdates(message, id);
            if (updates.isEmpty()) {
                return;
            }
            this.saveResultOnGraph.execute((graph, transitModel) -> this.snapshotSource.applyEstimatedTimetable(transitModel, this.fuzzyTripMatcher(), this.feedId, false, updates));
        }
        catch (JAXBException | XMLStreamException e) {
            LOG.error(e.getLocalizedMessage(), e);
        }
    }

    private void processHistory(String message, String id) {
        try {
            List<EstimatedTimetableDeliveryStructure> updates = this.getUpdates(message, id);
            if (updates.isEmpty()) {
                LOG.info("Did not receive any ET messages from history endpoint");
                return;
            }
            this.saveResultOnGraph.execute((graph, transitModel) -> {
                long t1 = System.currentTimeMillis();
                UpdateResult result = this.snapshotSource.applyEstimatedTimetable(transitModel, this.fuzzyTripMatcher(), this.feedId, false, updates);
                this.recordMetrics.accept(result);
                this.setPrimed(true);
                LOG.info("Azure ET updater initialized after {} ms: [time since startup: {}]", (Object)(System.currentTimeMillis() - t1), (Object)DurationFormatUtils.formatDuration((long)(this.now() - this.startTime), (String)"HH:mm:ss"));
            });
        }
        catch (JAXBException | XMLStreamException e) {
            LOG.error(e.getLocalizedMessage(), e);
        }
    }

    private List<EstimatedTimetableDeliveryStructure> getUpdates(String message, String id) throws JAXBException, XMLStreamException {
        Siri siri = SiriXml.parseXml((String)message);
        if (siri.getServiceDelivery() == null || siri.getServiceDelivery().getEstimatedTimetableDeliveries() == null || siri.getServiceDelivery().getEstimatedTimetableDeliveries().isEmpty()) {
            LOG.warn("Empty Siri message {}: {}", (Object)id, (Object)message);
            return new ArrayList<EstimatedTimetableDeliveryStructure>();
        }
        return siri.getServiceDelivery().getEstimatedTimetableDeliveries();
    }
}

