package cn.harveychan.canal.client.client;

import cn.harveychan.canal.client.handler.MessageHandler;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
import com.alibaba.otter.canal.protocol.FlatMessage;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/harveychan/canal/client/client/KafkaCanalClient.class */
public class KafkaCanalClient extends AbstractCanalClient {
    private static final Logger log = LoggerFactory.getLogger(KafkaCanalClient.class);

    /* loaded from: input_file:cn/harveychan/canal/client/client/KafkaCanalClient$Builder.class */
    public static final class Builder {
        private String filter = "";
        private Integer batchSize = 1;
        private Long timeout = 1L;
        private TimeUnit unit = TimeUnit.SECONDS;
        private String servers;
        private String topic;
        private Integer partition;
        private String groupId;
        private MessageHandler messageHandler;

        private Builder() {
        }

        public static Builder builder() {
            return new Builder();
        }

        public Builder servers(String str) {
            this.servers = str;
            return this;
        }

        public Builder topic(String str) {
            this.topic = str;
            return this;
        }

        public Builder partition(Integer num) {
            this.partition = num;
            return this;
        }

        public Builder groupId(String str) {
            this.groupId = str;
            return this;
        }

        public Builder filter(String str) {
            this.filter = str;
            return this;
        }

        public Builder batchSize(Integer num) {
            this.batchSize = num;
            return this;
        }

        public Builder timeout(Long l) {
            this.timeout = l;
            return this;
        }

        public Builder unit(TimeUnit timeUnit) {
            this.unit = timeUnit;
            return this;
        }

        public Builder messageHandler(MessageHandler messageHandler) {
            this.messageHandler = messageHandler;
            return this;
        }

        public KafkaCanalClient build() {
            CanalConnector kafkaCanalConnector = new KafkaCanalConnector(this.servers, this.topic, this.partition, this.groupId, this.batchSize, true);
            KafkaCanalClient kafkaCanalClient = new KafkaCanalClient();
            kafkaCanalClient.setConnector(kafkaCanalConnector);
            kafkaCanalClient.setMessageHandler(this.messageHandler);
            kafkaCanalClient.filter = this.filter;
            kafkaCanalClient.batchSize = this.batchSize;
            kafkaCanalClient.timeout = this.timeout;
            kafkaCanalClient.unit = this.unit;
            return kafkaCanalClient;
        }
    }

    public static Builder builder() {
        return Builder.builder();
    }

    @Override // cn.harveychan.canal.client.client.AbstractCanalClient, cn.harveychan.canal.client.client.CanalClient
    public void process() {
        KafkaCanalConnector connector = getConnector();
        MessageHandler messageHandler = getMessageHandler();
        while (this.flag) {
            try {
                connector.connect();
                connector.subscribe(this.filter);
                while (this.flag) {
                    try {
                        List flatListWithoutAck = connector.getFlatListWithoutAck(this.timeout, this.unit);
                        log.info("canal client get message: {}", flatListWithoutAck);
                        if (flatListWithoutAck != null && flatListWithoutAck.size() > 0) {
                            Iterator it = flatListWithoutAck.iterator();
                            while (it.hasNext()) {
                                messageHandler.handleMessage((FlatMessage) it.next());
                            }
                        }
                        connector.ack();
                    } catch (Exception e) {
                        log.error("canal client exception consumed: {}", e.getMessage(), e);
                    }
                }
            } catch (Exception e2) {
                log.error("canal client exception occurred: {}", e2.getMessage(), e2);
            }
        }
    }
}
