package com.caucho.message.nautilus;

import com.caucho.config.ConfigException;
import com.caucho.db.block.BlockStore;
import com.caucho.env.actor.ActorQueue;
import com.caucho.env.service.RootDirectorySystem;
import com.caucho.loader.Environment;
import com.caucho.message.DistributionMode;
import com.caucho.message.broker.AbstractMessageBroker;
import com.caucho.message.broker.BrokerReceiver;
import com.caucho.message.broker.BrokerSender;
import com.caucho.message.broker.EnvironmentMessageBroker;
import com.caucho.message.broker.ReceiverMessageHandler;
import com.caucho.message.journal.JournalFile;
import com.caucho.message.journal.JournalRecoverListener;
import com.caucho.message.journal.JournalWriteActor;
import com.caucho.message.nautilus.NautilusBrokerStore;
import com.caucho.util.L10N;
import com.caucho.util.RingItemFactory;
import com.caucho.vfs.Path;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.ejb.Startup;
import javax.inject.Singleton;

@Singleton
@Startup
/* loaded from: input_file:com/caucho/message/nautilus/NautilusBroker.class */
public class NautilusBroker extends AbstractMessageBroker implements Closeable {
    private static final Logger log = Logger.getLogger(NautilusBroker.class.getName());
    private static final L10N L = new L10N(NautilusBroker.class);
    private Path _path;
    private JournalFile _journalFile;
    private NautilusMultiQueueActor _nautilusActor;
    private ActorQueue<NautilusRingItem> _nautilusActorQueue;
    private NautilusBrokerStore _brokerStore = new NautilusBrokerStore();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/caucho/message/nautilus/NautilusBroker$NautilusItemFactory.class */
    public static class NautilusItemFactory implements RingItemFactory<NautilusRingItem> {
        NautilusItemFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.caucho.util.RingItemFactory
        public NautilusRingItem createItem(int i) {
            return new NautilusRingItem(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/caucho/message/nautilus/NautilusBroker$RecoverListener.class */
    public class RecoverListener implements JournalRecoverListener {
        private NautilusRingItem _entry = new NautilusRingItem(0);

        RecoverListener() {
        }

        @Override // com.caucho.message.journal.JournalRecoverListener
        public void onEntry(long j, boolean z, boolean z2, long j2, long j3, long j4, BlockStore blockStore, long j5, int i, int i2) throws IOException {
            this._entry.init(j, j2, j3, j4, null, 0, 0, null);
            this._entry.getResult().init1(blockStore, j5, i, i2);
            try {
                NautilusBroker.this._nautilusActor.process(this._entry);
            } catch (Exception e) {
                NautilusBroker.log.log(Level.WARNING, e.toString(), (Throwable) e);
            }
            this._entry.clear();
        }
    }

    public void setPath(Path path) {
        this._path = path;
    }

    @PostConstruct
    public void init() {
        if (this._path == null) {
            throw new ConfigException(L.l("'path' is required for a journal broker."));
        }
        initImpl();
        Environment.addCloseListener(this);
        registerSelf();
    }

    public static NautilusBroker getCurrent() {
        EnvironmentMessageBroker create = EnvironmentMessageBroker.create();
        NautilusBroker nautilusBroker = (NautilusBroker) create.findBroker(NautilusBroker.class);
        if (nautilusBroker == null) {
            nautilusBroker = new NautilusBroker();
            nautilusBroker.setPath(RootDirectorySystem.getCurrent().getDataDirectory().lookup("msg.journal"));
            nautilusBroker.init();
            create.addBroker(nautilusBroker);
        }
        return nautilusBroker;
    }

    @Override // com.caucho.message.broker.AbstractMessageBroker, com.caucho.message.broker.MessageBroker
    public BrokerSender createSender(String str, Map<String, Object> map) {
        return new NautilusBrokerPublisher(getQueue(str).getId(), getActorQueue());
    }

    @Override // com.caucho.message.broker.AbstractMessageBroker, com.caucho.message.broker.MessageBroker
    public BrokerReceiver createReceiver(String str, DistributionMode distributionMode, Map<String, Object> map, ReceiverMessageHandler receiverMessageHandler) {
        NautilusBrokerStore.BrokerQueue queue = getQueue(str);
        return new NautilusBrokerSubscriber(queue.getName(), queue.getId(), this._nautilusActorQueue, receiverMessageHandler);
    }

    private NautilusBrokerStore.BrokerQueue getQueue(String str) {
        return this._brokerStore.addQueue(str);
    }

    private void initImpl() {
        this._nautilusActor = new NautilusMultiQueueActor();
        this._journalFile = new JournalFile(this._path, new RecoverListener());
        this._nautilusActorQueue = new ActorQueue<>(8192, new NautilusItemFactory(), new JournalWriteActor(this._journalFile), this._nautilusActor);
        this._nautilusActor.setNautilusCheckpointPublisher(new NautilusCheckpointPublisher(this._nautilusActorQueue));
    }

    public int getSize() {
        return this._nautilusActor.getSize();
    }

    public long getEnqueueCount() {
        return this._nautilusActor.getEnqueueCount();
    }

    public long getDequeueCount() {
        return this._nautilusActor.getDequeueCount();
    }

    ActorQueue<NautilusRingItem> getActorQueue() {
        return this._nautilusActorQueue;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        System.out.println("CLOSE-ME:");
        this._nautilusActorQueue.wake();
        this._journalFile.close();
    }
}
