/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.socket.source;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(SocketSourceReader.class);
    private static final int CHAR_BUFFER_SIZE = 8192;
    private final SocketConfig parameter;
    private final SingleSplitReaderContext context;
    private Socket socket;
    private final String delimiter = "\n";

    SocketSourceReader(SocketConfig parameter, SingleSplitReaderContext context) {
        this.parameter = parameter;
        this.context = context;
    }

    public void open() throws Exception {
        this.socket = new Socket();
        log.info("connect socket server, host:[{}], port:[{}] ", (Object)this.parameter.getHost(), (Object)this.parameter.getPort());
        this.socket.connect(new InetSocketAddress(this.parameter.getHost(), this.parameter.getPort()), 0);
    }

    public void close() throws IOException {
        if (this.socket != null) {
            this.socket.close();
        }
    }

    @Override
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        StringBuilder buffer = new StringBuilder();
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));){
            int bytesRead;
            char[] buf = new char[8192];
            while ((bytesRead = reader.read(buf)) != -1) {
                int delimPos;
                buffer.append(buf, 0, bytesRead);
                while (buffer.length() >= this.delimiter.length() && (delimPos = buffer.indexOf(this.delimiter)) != -1) {
                    String record = buffer.substring(0, delimPos);
                    if (record.endsWith("\r")) {
                        record = record.substring(0, record.length() - 1);
                    }
                    output.collect((Object)new SeaTunnelRow(new Object[]{record}));
                    buffer.delete(0, delimPos + this.delimiter.length());
                }
                if (!Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) continue;
                this.context.signalNoMoreElement();
                break;
            }
        }
        if (buffer.length() > 0) {
            output.collect((Object)new SeaTunnelRow(new Object[]{buffer.toString()}));
        }
    }
}

