/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mq.restclient.impl;

import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.CourierObservable;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.MessageIdResult;
import com.mulesoft.mq.restclient.impl.AbstractDestinationDelegate;
import com.mulesoft.mq.restclient.impl.DefaultCourierObservable;
import com.mulesoft.mq.restclient.impl.PreservedMessage;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import java.util.List;
import java.util.stream.Collectors;
import rx.Observable;

public class PreservedMessagesDestination
extends AbstractDestinationDelegate {
    private MessagePreserver preserver;

    public PreservedMessagesDestination(Destination destination, MessagePreserver preserver) {
        super(destination);
        this.preserver = preserver;
    }

    @Override
    public CourierObservable<List<AnypointMqMessage>> receive() {
        return this.receive(1, 0L, 120000L);
    }

    @Override
    public CourierObservable<List<AnypointMqMessage>> receive(int batchSize, long poolingTime, long lockTtl) {
        return new DefaultCourierObservable<List<AnypointMqMessage>>(this.asObservable(this.delegate.receive(batchSize, poolingTime, lockTtl)).map(messages -> {
            this.preserver.add((List<AnypointMqMessage>)messages, lockTtl);
            return messages.stream().map(message -> new PreservedMessage((AnypointMqMessage)message, this.preserver)).collect(Collectors.toList());
        }));
    }

    private <T> Observable<T> asObservable(CourierObservable<T> courierObservable) {
        return ((DefaultCourierObservable)courierObservable).getRxObservable();
    }

    @Override
    public CourierObservable<List<MessageIdResult>> ack(List<AnypointMqMessage> messages) {
        return this.removeFromPreserver(this.delegate.ack(messages));
    }

    @Override
    public CourierObservable<List<MessageIdResult>> nack(List<AnypointMqMessage> messages) {
        return this.removeFromPreserver(this.delegate.nack(messages));
    }

    @Override
    public CourierObservable<MessageIdResult> ack(AnypointMqMessage message) {
        return this.removeSingleFromPreserver(this.delegate.ack(message));
    }

    @Override
    public CourierObservable<MessageIdResult> nack(AnypointMqMessage message) {
        return this.removeSingleFromPreserver(this.delegate.nack(message));
    }

    private DefaultCourierObservable<List<MessageIdResult>> removeFromPreserver(CourierObservable<List<MessageIdResult>> resultsObs) {
        return new DefaultCourierObservable<List<MessageIdResult>>(this.asObservable(resultsObs).doOnNext(messageIdResults -> messageIdResults.forEach(this::removeFromPreserver)));
    }

    private CourierObservable<MessageIdResult> removeSingleFromPreserver(CourierObservable<MessageIdResult> ack) {
        return new DefaultCourierObservable<MessageIdResult>(this.asObservable(ack).doOnNext(this::removeFromPreserver));
    }

    private void removeFromPreserver(MessageIdResult messageIdResult) {
        if (messageIdResult.isSuccess()) {
            this.preserver.remove(messageIdResult.getMessageId());
        }
    }
}

