/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
import io.debezium.data.Envelope;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.HexConverter;
import io.debezium.util.SchemaNameAdjuster;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

public class LogicalDecodingMessageMonitor {
    public static final String LOGICAL_DECODING_MESSAGE_TOPIC_SUFFIX = ".message";
    public static final String DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY = "message";
    public static final String DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY = "prefix";
    public static final String DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY = "content";
    private final SchemaNameAdjuster schemaNameAdjuster;
    private final BlockingConsumer<SourceRecord> sender;
    private final String topicName;
    private final CommonConnectorConfig.BinaryHandlingMode binaryMode;
    private final Base64.Encoder base64Encoder;
    private final Schema keySchema;
    private final Schema blockSchema;
    private final Schema valueSchema;

    public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, BlockingConsumer<SourceRecord> sender) {
        this.schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
        this.sender = sender;
        this.topicName = connectorConfig.getLogicalName() + LOGICAL_DECODING_MESSAGE_TOPIC_SUFFIX;
        this.binaryMode = connectorConfig.binaryHandlingMode();
        this.base64Encoder = Base64.getEncoder();
        this.keySchema = SchemaBuilder.struct().name(this.schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageKey")).field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA).build();
        this.blockSchema = SchemaBuilder.struct().name(this.schemaNameAdjuster.adjust("io.debezium.connector.postgresql.Message")).field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, Schema.OPTIONAL_STRING_SCHEMA).field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, this.binaryMode.getSchema().optional().build()).build();
        this.valueSchema = SchemaBuilder.struct().name(this.schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageValue")).field("op", Schema.STRING_SCHEMA).field("ts_ms", Schema.OPTIONAL_INT64_SCHEMA).field("source", connectorConfig.getSourceInfoStructMaker().schema()).field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, this.blockSchema).build();
    }

    public void logicalDecodingMessageEvent(Partition partition, OffsetContext offsetContext, Long timestamp, LogicalDecodingMessage message) throws InterruptedException {
        Struct logicalMsgStruct = new Struct(this.blockSchema);
        logicalMsgStruct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, (Object)message.getPrefix());
        logicalMsgStruct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, this.convertContent(message.getContent()));
        Struct key = new Struct(this.keySchema);
        key.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, (Object)message.getPrefix());
        Struct value = new Struct(this.valueSchema);
        value.put("op", (Object)Envelope.Operation.MESSAGE.code());
        value.put("ts_ms", (Object)timestamp);
        value.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, (Object)logicalMsgStruct);
        value.put("source", (Object)offsetContext.getSourceInfo());
        this.sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), this.topicName, this.keySchema, key, value.schema(), value));
        if (message.isLastEventForLsn()) {
            offsetContext.getTransactionContext().endTransaction();
        }
    }

    private Object convertContent(byte[] content) {
        switch (this.binaryMode) {
            case BASE64: {
                return new String(this.base64Encoder.encode(content), StandardCharsets.UTF_8);
            }
            case HEX: {
                return HexConverter.convertToHexString(content);
            }
            case BYTES: {
                return ByteBuffer.wrap(content);
            }
        }
        return ByteBuffer.wrap(content);
    }
}

