/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.decentred.remote.net;

import java.io.EOFException;
import java.io.IOException;
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.tcp.ISocketChannel;
import net.openhft.chronicle.decentred.remote.net.TCPConnection;

public abstract class AbstractTCPConnection
implements TCPConnection {
    private final ThreadLocal<ByteBuffer[]> headerBytesTL = ThreadLocal.withInitial(AbstractTCPConnection::createHeaderArray);
    protected volatile SocketChannel channel;
    protected ISocketChannel iSocketChannel;
    volatile boolean running = true;

    protected AbstractTCPConnection(SocketChannel channel) {
        this.channel(channel);
    }

    protected AbstractTCPConnection() {
    }

    private static ByteBuffer[] createHeaderArray() {
        ByteBuffer header = ByteBuffer.allocateDirect(4).order(ByteOrder.LITTLE_ENDIAN);
        ByteBuffer[] ret = new ByteBuffer[]{header, null};
        return ret;
    }

    public AbstractTCPConnection channel(SocketChannel channel) {
        this.iSocketChannel = channel == null ? null : ISocketChannel.wrap((SocketChannel)channel);
        this.channel = channel;
        return this;
    }

    public String toString() {
        return this.getClass().getSimpleName() + " " + this.channel;
    }

    @Override
    public void write(BytesStore<?, ByteBuffer> bytes) throws IOException {
        if (!this.running) {
            throw new IOException("closed");
        }
        this.waitForReconnect();
        if (bytes.readRemaining() > 1048572L) {
            throw new IOException("Message too long " + bytes.readRemaining());
        }
        ByteBuffer buffer = (ByteBuffer)bytes.underlyingObject();
        assert (buffer != null);
        buffer.limit(Math.toIntExact(bytes.readLimit()));
        buffer.position(Math.toIntExact(bytes.readPosition()));
        ByteBuffer[] headerBytes = this.headerBytesTL.get();
        headerBytes[0].clear();
        headerBytes[0].putInt(0, 4 + buffer.remaining());
        headerBytes[1] = buffer;
        while (buffer.remaining() > 0 && this.running) {
            if (this.iSocketChannel.write(headerBytes) >= 0L) continue;
            this.channel.close();
            throw new EOFException("Failed to write");
        }
    }

    @Override
    public void write(ByteBuffer buffer) throws IOException {
        if (!this.running) {
            throw new IOException("closed");
        }
        this.waitForReconnect();
        if (buffer.remaining() > 0x100000) {
            throw new IOException("Message too long " + buffer.remaining());
        }
        while (buffer.remaining() > 0 && this.running) {
            if (this.iSocketChannel.write(buffer) >= 0) continue;
            this.channel.close();
            throw new EOFException("Failed to write");
        }
    }

    protected abstract void waitForReconnect() throws IOException;

    protected void readChannel(Bytes<ByteBuffer> bytes) throws IOException {
        if (bytes.readRemaining() >= 4L) {
            int length = bytes.readInt(bytes.readPosition());
            if (length < 4 || length > 0x100000) {
                throw new StreamCorruptedException("length: " + length);
            }
            if (bytes.readRemaining() >= (long)length) {
                this.processOneMessage(length, bytes);
                return;
            }
        }
        if (bytes.readRemaining() == 0L) {
            bytes.clear();
        } else if (bytes.readPosition() > 32768L) {
            bytes.compact();
        }
        ByteBuffer buffer = (ByteBuffer)bytes.underlyingObject();
        assert (buffer != null);
        buffer.position(Math.toIntExact(bytes.writePosition()));
        buffer.limit(Math.toIntExact(bytes.realCapacity()));
        if (this.iSocketChannel.read(buffer) < 0) {
            throw new EOFException();
        }
        bytes.readLimit((long)buffer.position());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOneMessage(int length, Bytes<ByteBuffer> bytes) throws IOException {
        long end = bytes.readPosition() + (long)length;
        long limit = bytes.readLimit();
        try {
            bytes.readSkip(4L);
            bytes.readLimit(end);
            this.onMessage(bytes);
        }
        finally {
            bytes.readLimit(limit);
            bytes.readPosition(end);
        }
    }

    protected abstract void onMessage(Bytes<ByteBuffer> var1) throws IOException;

    public final void close() {
        this.running = false;
        this.close2();
        Closeable.closeQuietly((Object)this.channel);
    }

    protected abstract void close2();
}

