/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.wire.channel.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import net.openhft.affinity.AffinityStrategy;
import net.openhft.affinity.AffinityThreadFactory;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.UnrecoverableTimeoutException;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.channel.EventPoller;
import net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel;
import net.openhft.chronicle.wire.channel.impl.TCPChronicleChannel;
import net.openhft.chronicle.wire.channel.impl.WireExchanger;

public class BufferedChronicleChannel
extends DelegateChronicleChannel {
    private final Pauser pauser;
    private final WireExchanger exchanger = new WireExchanger();
    private final ExecutorService bgWriter;
    private final int lingerNs;
    private volatile EventPoller eventPoller;

    public BufferedChronicleChannel(TCPChronicleChannel channel, Pauser pauser) {
        this(channel, pauser, 8);
    }

    public BufferedChronicleChannel(TCPChronicleChannel channel, Pauser pauser, int lingerUs) {
        super(channel);
        this.pauser = pauser;
        this.lingerNs = lingerUs * 1000;
        String desc = channel.connectionCfg().initiator() ? "init" : "accp";
        String writer = desc + "~writer";
        AffinityThreadFactory factory = pauser.isBusy() ? new AffinityThreadFactory(writer, true, new AffinityStrategy[0]) : new NamedThreadFactory(writer, Boolean.valueOf(true));
        this.bgWriter = Executors.newSingleThreadExecutor((ThreadFactory)factory);
        this.bgWriter.submit(this::bgWrite);
    }

    @Override
    public EventPoller eventPoller() {
        return this.eventPoller;
    }

    @Override
    public BufferedChronicleChannel eventPoller(EventPoller eventPoller) {
        this.eventPoller = eventPoller;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void bgWrite() {
        try {
            TCPChronicleChannel channel = (TCPChronicleChannel)this.channel;
            while (!this.isClosing()) {
                long start = System.nanoTime();
                channel.checkConnected();
                Wire wire = this.exchanger.acquireConsumer();
                if (wire.bytes().isEmpty()) {
                    EventPoller eventPoller = this.eventPoller();
                    if (eventPoller == null || !eventPoller.onPoll(this)) {
                        this.pauser.pause();
                    }
                    this.exchanger.releaseConsumer();
                    continue;
                }
                assert (TCPChronicleChannel.validateHeader(wire.bytes().peekVolatileInt()));
                this.pauser.reset();
                channel.flushOut(wire);
                this.exchanger.releaseConsumer();
                while (System.nanoTime() < start + (long)this.lingerNs) {
                    this.pauser.pause();
                }
            }
        }
        catch (Throwable t) {
            if (!this.isClosing()) {
                Jvm.warn().on(this.getClass(), "bgWriter died", t);
            }
        }
        finally {
            this.bgWriter.shutdown();
            Closeable.closeQuietly((Object)this.eventPoller());
        }
    }

    @Override
    public DocumentContext writingDocument(boolean metaData) throws UnrecoverableTimeoutException {
        return this.exchanger.writingDocument(metaData);
    }

    @Override
    public DocumentContext acquireWritingDocument(boolean metaData) throws UnrecoverableTimeoutException {
        return this.exchanger.acquireWritingDocument(metaData);
    }

    @Override
    public void close() {
        super.close();
        this.exchanger.close();
    }
}

