/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchSinkWriter
implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState>,
SupportMultiTableSinkWriter<Void>,
SupportSchemaEvolutionSinkWriter {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
    private final SinkWriter.Context context;
    private final int maxBatchSize;
    private SeaTunnelRowSerializer seaTunnelRowSerializer;
    private final List<String> requestEsList;
    private EsRestClient esRestClient;
    private RetryUtils.RetryMaterial retryMaterial;
    private static final long DEFAULT_SLEEP_TIME_MS = 200L;
    private final IndexInfo indexInfo;
    private TableSchema tableSchema;
    private final TableSchemaChangeEventHandler tableSchemaChangeEventHandler;

    public ElasticsearchSinkWriter(SinkWriter.Context context, CatalogTable catalogTable, ReadonlyConfig config, int maxBatchSize, int maxRetryCount) {
        this.context = context;
        this.maxBatchSize = maxBatchSize;
        this.indexInfo = new IndexInfo(catalogTable.getTableId().getTableName().toLowerCase(), config);
        this.esRestClient = EsRestClient.createInstance(config);
        this.seaTunnelRowSerializer = new ElasticsearchRowSerializer(this.esRestClient.getClusterInfo(), this.indexInfo, catalogTable.getSeaTunnelRowType());
        this.requestEsList = new ArrayList<String>(maxBatchSize);
        this.retryMaterial = new RetryUtils.RetryMaterial(maxRetryCount, true, exception -> true, 200L);
        this.tableSchema = catalogTable.getTableSchema();
        this.tableSchemaChangeEventHandler = new TableSchemaChangeEventDispatcher();
    }

    public void write(SeaTunnelRow element) {
        if (RowKind.UPDATE_BEFORE.equals((Object)element.getRowKind())) {
            return;
        }
        String indexRequestRow = this.seaTunnelRowSerializer.serializeRow(element);
        this.requestEsList.add(indexRequestRow);
        if (this.requestEsList.size() >= this.maxBatchSize) {
            this.bulkEsWithRetry(this.esRestClient, this.requestEsList);
        }
    }

    public void applySchemaChange(SchemaChangeEvent event) throws IOException {
        if (event instanceof AlterTableColumnsEvent) {
            for (AlterTableColumnEvent columnEvent : ((AlterTableColumnsEvent)event).getEvents()) {
                this.applySingleSchemaChangeEvent((SchemaChangeEvent)columnEvent);
            }
        } else if (event instanceof AlterTableColumnEvent) {
            this.applySingleSchemaChangeEvent(event);
        } else {
            throw new UnsupportedOperationException("Unsupported alter table event: " + event);
        }
        this.tableSchema = this.tableSchemaChangeEventHandler.reset(this.tableSchema).apply(event);
        this.seaTunnelRowSerializer = new ElasticsearchRowSerializer(this.esRestClient.getClusterInfo(), this.indexInfo, this.tableSchema.toPhysicalRowDataType());
    }

    private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
        if (!(event instanceof AlterTableAddColumnEvent)) {
            throw new SeaTunnelException("Unsupported schemaChangeEvent : " + event.getEventType());
        }
        AlterTableAddColumnEvent addColumnEvent = (AlterTableAddColumnEvent)event;
        Column column = addColumnEvent.getColumn();
        BasicTypeDefine<EsType> reconvert = ElasticSearchTypeConverter.INSTANCE.reconvert(column);
        this.esRestClient.addField(this.indexInfo.getIndex(), reconvert);
        log.info("Add column {} to index {}", (Object)column.getName(), (Object)this.indexInfo.getIndex());
    }

    public Optional<ElasticsearchCommitInfo> prepareCommit() {
        this.bulkEsWithRetry(this.esRestClient, this.requestEsList);
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public synchronized void bulkEsWithRetry(EsRestClient esRestClient, List<String> requestEsList) {
        try {
            RetryUtils.retryWithException(() -> {
                if (requestEsList.size() > 0) {
                    String requestBody = String.join((CharSequence)"\n", requestEsList) + "\n";
                    BulkResponse bulkResponse = esRestClient.bulk(requestBody);
                    if (bulkResponse.isErrors()) {
                        throw new ElasticsearchConnectorException((SeaTunnelErrorCode)ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, "bulk es error: " + bulkResponse.getResponse());
                    }
                    return bulkResponse;
                }
                return null;
            }, (RetryUtils.RetryMaterial)this.retryMaterial);
            requestEsList.clear();
        }
        catch (Exception e) {
            throw new ElasticsearchConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.SQL_OPERATION_FAILED, "ElasticSearch execute batch statement error", e);
        }
    }

    public void close() throws IOException {
        this.bulkEsWithRetry(this.esRestClient, this.requestEsList);
        this.esRestClient.close();
    }
}

