/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.gcp.pubsub.core;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.gcp.pubsub.core.PubSubOperations;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.PublisherFactory;
import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory;
import org.springframework.cloud.gcp.pubsub.support.converter.PubSubMessageConverter;
import org.springframework.cloud.gcp.pubsub.support.converter.SimplePubSubMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

public class PubSubTemplate
implements PubSubOperations,
InitializingBean {
    private static final Log LOGGER = LogFactory.getLog(PubSubTemplate.class);
    private PubSubMessageConverter messageConverter = new SimplePubSubMessageConverter();
    private final PublisherFactory publisherFactory;
    private final SubscriberFactory subscriberFactory;
    private final SubscriberStub subscriberStub;

    public PubSubTemplate(PublisherFactory publisherFactory, SubscriberFactory subscriberFactory) {
        this.publisherFactory = publisherFactory;
        this.subscriberFactory = subscriberFactory;
        this.subscriberStub = this.subscriberFactory.createSubscriberStub();
    }

    public PubSubMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(PubSubMessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"A valid Pub/Sub message converter is required.");
        this.messageConverter = messageConverter;
    }

    @Override
    public <T> ListenableFuture<String> publish(String topic, T payload, Map<String, String> headers) {
        return this.publish(topic, this.messageConverter.toPubSubMessage(payload, headers));
    }

    @Override
    public <T> ListenableFuture<String> publish(String topic, T payload) {
        return this.publish(topic, payload, null);
    }

    @Override
    public ListenableFuture<String> publish(final String topic, PubsubMessage pubsubMessage) {
        ApiFuture publishFuture = this.publisherFactory.createPublisher(topic).publish(pubsubMessage);
        final SettableListenableFuture settableFuture = new SettableListenableFuture();
        ApiFutures.addCallback((ApiFuture)publishFuture, (ApiFutureCallback)new ApiFutureCallback<String>(){

            public void onFailure(Throwable throwable) {
                LOGGER.warn((Object)("Publishing to " + topic + " topic failed."), throwable);
                settableFuture.setException(throwable);
            }

            public void onSuccess(String result) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug((Object)("Publishing to " + topic + " was successful. Message ID: " + result));
                }
                settableFuture.set((Object)result);
            }
        });
        return settableFuture;
    }

    @Override
    public Subscriber subscribe(String subscription, MessageReceiver messageReceiver) {
        Subscriber subscriber = this.subscriberFactory.createSubscriber(subscription, messageReceiver);
        subscriber.startAsync();
        return subscriber;
    }

    private List<AcknowledgeablePubsubMessage> pull(PullRequest pullRequest) {
        Assert.notNull((Object)pullRequest, (String)"The pull request cannot be null.");
        PullResponse pullResponse = (PullResponse)this.subscriberStub.pullCallable().call((Object)pullRequest);
        List<AcknowledgeablePubsubMessage> receivedMessages = pullResponse.getReceivedMessagesList().stream().map(message -> new AcknowledgeablePubsubMessage(message.getMessage(), message.getAckId(), pullRequest.getSubscription(), this.subscriberStub)).collect(Collectors.toList());
        return receivedMessages;
    }

    @Override
    public List<AcknowledgeablePubsubMessage> pull(String subscription, Integer maxMessages, Boolean returnImmediately) {
        return this.pull(this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately));
    }

    @Override
    public List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages, Boolean returnImmediately) {
        PullRequest pullRequest = this.subscriberFactory.createPullRequest(subscription, maxMessages, returnImmediately);
        List<AcknowledgeablePubsubMessage> ackableMessages = this.pull(pullRequest);
        this.ack(ackableMessages);
        return ackableMessages.stream().map(AcknowledgeablePubsubMessage::getPubsubMessage).collect(Collectors.toList());
    }

    @Override
    public PubsubMessage pullNext(String subscription) {
        List<PubsubMessage> receivedMessageList = this.pullAndAck(subscription, 1, true);
        return receivedMessageList.size() > 0 ? receivedMessageList.get(0) : null;
    }

    public void afterPropertiesSet() throws Exception {
    }

    public PublisherFactory getPublisherFactory() {
        return this.publisherFactory;
    }

    public SubscriberFactory getSubscriberFactory() {
        return this.subscriberFactory;
    }

    @Override
    public void ack(Collection<AcknowledgeablePubsubMessage> acknowledgeablePubsubMessages) {
        Assert.notEmpty(acknowledgeablePubsubMessages, (String)"The acknowledgeablePubsubMessages cannot be null.");
        this.groupAcknowledgeableMessages(acknowledgeablePubsubMessages).forEach(this::ack);
    }

    @Override
    public void nack(Collection<AcknowledgeablePubsubMessage> acknowledgeablePubsubMessages) {
        Assert.notEmpty(acknowledgeablePubsubMessages, (String)"The acknowledgeablePubsubMessages cannot be null.");
        this.groupAcknowledgeableMessages(acknowledgeablePubsubMessages).forEach(this::nack);
    }

    private Map<String, List<String>> groupAcknowledgeableMessages(Collection<AcknowledgeablePubsubMessage> acknowledgeablePubsubMessages) {
        return acknowledgeablePubsubMessages.stream().collect(Collectors.groupingBy(AcknowledgeablePubsubMessage::getSubscriptionName, Collectors.mapping(AcknowledgeablePubsubMessage::getAckId, Collectors.toList())));
    }

    private void ack(String subscriptionName, Collection<String> ackIds) {
        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().addAllAckIds(ackIds).setSubscription(subscriptionName).build();
        this.subscriberStub.acknowledgeCallable().call((Object)acknowledgeRequest);
    }

    private void nack(String subscriptionName, Collection<String> ackIds) {
        ModifyAckDeadlineRequest modifyAckDeadlineRequest = ModifyAckDeadlineRequest.newBuilder().setAckDeadlineSeconds(0).addAllAckIds(ackIds).setSubscription(subscriptionName).build();
        this.subscriberStub.modifyAckDeadlineCallable().call((Object)modifyAckDeadlineRequest);
    }
}

