package cn.harveychan.canal.client.client;

import cn.harveychan.canal.client.handler.MessageHandler;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/harveychan/canal/client/client/AbstractCanalClient.class */
public abstract class AbstractCanalClient implements CanalClient {
    private static final Logger log = LoggerFactory.getLogger(AbstractCanalClient.class);
    protected volatile boolean flag;
    private Thread workThread;
    private CanalConnector connector;
    protected String filter = "";
    protected Integer batchSize = 1;
    protected Long timeout = 1L;
    protected TimeUnit unit = TimeUnit.SECONDS;
    private MessageHandler messageHandler;

    @Override // cn.harveychan.canal.client.client.CanalClient
    public void start() {
        log.info("start canal client");
        this.workThread = new Thread(this::process);
        this.workThread.setName("canal-client-thread");
        this.flag = true;
        this.workThread.start();
    }

    @Override // cn.harveychan.canal.client.client.CanalClient
    public void stop() {
        log.info("stop canal client");
        this.flag = false;
        if (this.workThread != null) {
            this.workThread.interrupt();
        }
    }

    @Override // cn.harveychan.canal.client.client.CanalClient
    public void process() {
        while (this.flag) {
            try {
                try {
                    this.connector.connect();
                    this.connector.subscribe(this.filter);
                    while (this.flag) {
                        Message withoutAck = this.connector.getWithoutAck(this.batchSize.intValue(), this.timeout, this.unit);
                        log.info("canal client get message: {}", withoutAck);
                        long id = withoutAck.getId();
                        if (withoutAck.getId() != -1 && withoutAck.getEntries().size() != 0) {
                            this.messageHandler.handleMessage(withoutAck);
                        }
                        this.connector.ack(id);
                    }
                    this.connector.disconnect();
                } catch (Exception e) {
                    log.error("canal client exception occurred: {}", e.getMessage(), e);
                    this.connector.disconnect();
                }
            } catch (Throwable th) {
                this.connector.disconnect();
                throw th;
            }
        }
    }

    public CanalConnector getConnector() {
        return this.connector;
    }

    public void setConnector(CanalConnector canalConnector) {
        this.connector = canalConnector;
    }

    public MessageHandler getMessageHandler() {
        return this.messageHandler;
    }

    public void setMessageHandler(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }
}
