/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ObservationCollector;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.Clock;
import com.rabbitmq.stream.impl.MessageAccumulator;
import com.rabbitmq.stream.impl.ProducerUtils;
import com.rabbitmq.stream.impl.StreamProducer;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.ToLongFunction;

class SimpleMessageAccumulator
implements MessageAccumulator {
    protected final BlockingQueue<ProducerUtils.AccumulatedEntity> messages;
    private final int capacity;
    final ObservationCollector<Object> observationCollector;
    private final StreamProducer producer;
    final ProducerUtils.MessageAccumulatorHelper helper;

    SimpleMessageAccumulator(int capacity, Codec codec, int maxFrameSize, ToLongFunction<Message> publishSequenceFunction, Function<Message, String> filterValueExtractor, Clock clock, String stream, ObservationCollector<?> observationCollector, StreamProducer producer) {
        this.helper = new ProducerUtils.MessageAccumulatorHelper(codec, maxFrameSize, publishSequenceFunction, filterValueExtractor, clock, stream, observationCollector);
        this.capacity = capacity;
        this.messages = new LinkedBlockingQueue<ProducerUtils.AccumulatedEntity>(this.capacity);
        this.observationCollector = observationCollector;
        this.producer = producer;
    }

    @Override
    public void add(Message message, ConfirmationHandler confirmationHandler) {
        ProducerUtils.AccumulatedEntity entity = this.helper.entity(message, confirmationHandler);
        try {
            boolean offered = this.messages.offer(entity, 60L, TimeUnit.SECONDS);
            if (!offered) {
                throw new StreamException("Could not accumulate outbound message");
            }
        }
        catch (InterruptedException e) {
            throw new StreamException("Error while accumulating outbound message", e);
        }
        if (this.messages.size() == this.capacity) {
            this.publishBatch(true);
        }
    }

    ProducerUtils.AccumulatedEntity get() {
        ProducerUtils.AccumulatedEntity entity = (ProducerUtils.AccumulatedEntity)this.messages.poll();
        if (entity != null) {
            this.observationCollector.published(entity.observationContext(), entity.confirmationCallback().message());
        }
        return entity;
    }

    @Override
    public int size() {
        return this.messages.size();
    }

    @Override
    public void flush(boolean force) {
        boolean stateCheck = !force;
        this.publishBatch(stateCheck);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishBatch(boolean stateCheck) {
        this.producer.lock();
        try {
            if (!(stateCheck && !this.producer.canSend() || this.messages.isEmpty())) {
                ProducerUtils.AccumulatedEntity entity;
                ArrayList<Object> entities = new ArrayList<Object>(this.capacity);
                for (int batchCount = 0; batchCount != this.capacity && (entity = this.get()) != null; ++batchCount) {
                    entities.add(entity);
                }
                this.producer.publishInternal(entities);
            }
        }
        finally {
            this.producer.unlock();
        }
    }

    @Override
    public void close() {
    }
}

