/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.RocketMqSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceReader;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;

public class RocketMqSource
implements SeaTunnelSource<SeaTunnelRow, RocketMqSourceSplit, RocketMqSourceState>,
SupportParallelism {
    private final ReadonlyConfig pluginConfig;
    private final CatalogTable catalogTable;
    private final ConsumerMetadata metadata;
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private JobContext jobContext;

    public RocketMqSource(ReadonlyConfig pluginConfig) {
        this.pluginConfig = pluginConfig;
        this.metadata = new ConsumerMetadata();
        this.metadata.setTopics(Arrays.asList(((String)pluginConfig.get(RocketMqSourceOptions.TOPICS)).split(",")));
        String tags = (String)pluginConfig.get(RocketMqSourceOptions.TAGS);
        if (tags != null && !tags.trim().isEmpty()) {
            this.metadata.setTags(Arrays.stream(tags.split(",")).map(String::trim).filter(tag -> !tag.isEmpty()).distinct().collect(Collectors.toList()));
        } else {
            this.metadata.setTags(Collections.emptyList());
        }
        RocketMqBaseConfiguration.Builder baseConfigBuilder = RocketMqBaseConfiguration.newBuilder().consumer().namesrvAddr((String)pluginConfig.get(RocketMqSourceOptions.NAME_SRV_ADDR));
        if (pluginConfig.getOptional(RocketMqSourceOptions.ACCESS_KEY).isPresent()) {
            baseConfigBuilder.accessKey((String)pluginConfig.get(RocketMqSourceOptions.ACCESS_KEY));
        }
        if (pluginConfig.getOptional(RocketMqSourceOptions.SECRET_KEY).isPresent()) {
            baseConfigBuilder.secretKey((String)pluginConfig.get(RocketMqSourceOptions.SECRET_KEY));
        }
        baseConfigBuilder.aclEnable((Boolean)pluginConfig.get(RocketMqSourceOptions.ACL_ENABLED));
        baseConfigBuilder.groupId((String)pluginConfig.get(RocketMqSourceOptions.CONSUMER_GROUP));
        baseConfigBuilder.batchSize((Integer)pluginConfig.get(RocketMqSourceOptions.BATCH_SIZE));
        baseConfigBuilder.pollTimeoutMillis((Long)pluginConfig.get(RocketMqSourceOptions.POLL_TIMEOUT_MILLIS));
        this.metadata.setBaseConfig(baseConfigBuilder.build());
        this.metadata.setEnabledCommitCheckpoint((Boolean)pluginConfig.get(RocketMqSourceOptions.COMMIT_ON_CHECKPOINT));
        StartMode startMode = (StartMode)((Object)pluginConfig.get(RocketMqSourceOptions.START_MODE));
        switch (startMode) {
            case CONSUME_FROM_TIMESTAMP: {
                long startOffsetsTimestamp = (Long)pluginConfig.get(RocketMqSourceOptions.START_MODE_TIMESTAMP);
                long currentTimestamp = System.currentTimeMillis();
                if (startOffsetsTimestamp < 0L || startOffsetsTimestamp > currentTimestamp) {
                    throw new IllegalArgumentException("The offsets timestamp value is smaller than 0 or smaller than the current time");
                }
                this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
                break;
            }
            case CONSUME_FROM_SPECIFIC_OFFSETS: {
                Map offsetConfigMap = (Map)pluginConfig.get(RocketMqSourceOptions.START_MODE_OFFSETS);
                HashMap<MessageQueue, Long> specificStartOffsets = new HashMap<MessageQueue, Long>();
                offsetConfigMap.forEach((k, v) -> {
                    int splitIndex = k.lastIndexOf("-");
                    String topic = k.substring(0, splitIndex);
                    String partition = k.substring(splitIndex + 1);
                    MessageQueue messageQueue = new MessageQueue(topic, null, Integer.parseInt(partition));
                    specificStartOffsets.put(messageQueue, (Long)v);
                });
                this.metadata.setSpecificStartOffsets(specificStartOffsets);
                break;
            }
        }
        this.metadata.setStartMode(startMode);
        this.catalogTable = CatalogTableUtil.buildWithConfig((ReadonlyConfig)pluginConfig);
        this.setDeserialization(pluginConfig);
    }

    public String getPluginName() {
        return "Rocketmq";
    }

    public List<CatalogTable> getProducedCatalogTables() {
        return Collections.singletonList(this.catalogTable);
    }

    public Boundedness getBoundedness() {
        return JobMode.BATCH.equals((Object)this.jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
    }

    public void setJobContext(JobContext jobContext) {
        this.jobContext = jobContext;
    }

    public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
        return new RocketMqSourceReader(this.metadata, this.deserializationSchema, readerContext);
    }

    public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> createEnumerator(SourceSplitEnumerator.Context<RocketMqSourceSplit> context) throws Exception {
        return new RocketMqSourceSplitEnumerator(this.metadata, context, (Long)this.pluginConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS));
    }

    public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> restoreEnumerator(SourceSplitEnumerator.Context<RocketMqSourceSplit> context, RocketMqSourceState sourceState) throws Exception {
        return new RocketMqSourceSplitEnumerator(this.metadata, context, (Long)this.pluginConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS));
    }

    private void setDeserialization(ReadonlyConfig config) {
        if (config.getOptional(RocketMqSourceOptions.SCHEMA).isPresent()) {
            SchemaFormat format = (SchemaFormat)((Object)config.get(RocketMqSourceOptions.FORMAT));
            boolean ignoreParseErrors = (Boolean)config.get(RocketMqSourceOptions.IGNORE_PARSE_ERRORS);
            switch (format) {
                case JSON: {
                    this.deserializationSchema = new JsonDeserializationSchema(this.catalogTable, false, ignoreParseErrors);
                    break;
                }
                case TEXT: {
                    this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.catalogTable.getSeaTunnelRowType()).delimiter((String)config.get(RocketMqSourceOptions.FIELD_DELIMITER)).build();
                    break;
                }
                default: {
                    throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + (Object)((Object)format));
                }
            }
        } else {
            this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.catalogTable.getSeaTunnelRowType()).delimiter(String.valueOf('\u0002')).build();
        }
    }
}

