/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.publisher;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.jdbc.DataSourceBuilder;
import org.apache.gobblin.writer.commands.JdbcWriterCommands;
import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcPublisher
extends DataPublisher {
    public static final String JDBC_PUBLISHER_PREFIX = "jdbc.publisher.";
    public static final String JDBC_PUBLISHER_DATABASE_NAME = "jdbc.publisher.database_name";
    public static final String JDBC_PUBLISHER_FINAL_TABLE_NAME = "jdbc.publisher.table_name";
    public static final String JDBC_PUBLISHER_REPLACE_FINAL_TABLE = "jdbc.publisher.replace_table";
    public static final String JDBC_PUBLISHER_USERNAME = "jdbc.publisher.username";
    public static final String JDBC_PUBLISHER_PASSWORD = "jdbc.publisher.password";
    public static final String JDBC_PUBLISHER_ENCRYPTION_KEY_LOC = "jdbc.publisher.encrypt_key_loc";
    public static final String JDBC_PUBLISHER_URL = "jdbc.publisher.url";
    public static final String JDBC_PUBLISHER_TIMEOUT = "jdbc.publisher.timeout";
    public static final String JDBC_PUBLISHER_DRIVER = "jdbc.publisher.driver";
    private static final Logger LOG = LoggerFactory.getLogger(JdbcPublisher.class);
    private final JdbcWriterCommandsFactory jdbcWriterCommandsFactory;

    @VisibleForTesting
    public JdbcPublisher(State state, JdbcWriterCommandsFactory jdbcWriterCommandsFactory) {
        super(state);
        this.jdbcWriterCommandsFactory = jdbcWriterCommandsFactory;
        this.validate(this.getState());
    }

    public JdbcPublisher(State state) {
        this(state, new JdbcWriterCommandsFactory());
        this.validate(this.getState());
    }

    private void validate(State state) {
        JobCommitPolicy jobCommitPolicy = JobCommitPolicy.getCommitPolicy((Properties)this.getState().getProperties());
        if (JobCommitPolicy.COMMIT_ON_FULL_SUCCESS != jobCommitPolicy) {
            throw new IllegalArgumentException(((Object)((Object)this)).getClass().getSimpleName() + " won't publish as already commited by task. Job commit policy " + jobCommitPolicy);
        }
        if (!state.getPropAsBoolean("publish.data.at.job.level", true)) {
            throw new IllegalArgumentException(((Object)((Object)this)).getClass().getSimpleName() + " won't publish as " + "publish.data.at.job.level" + " is set as false");
        }
    }

    @VisibleForTesting
    public Connection createConnection() {
        DataSource dataSource = DataSourceBuilder.builder().url(this.state.getProp(JDBC_PUBLISHER_URL)).driver(this.state.getProp(JDBC_PUBLISHER_DRIVER)).userName(this.state.getProp(JDBC_PUBLISHER_USERNAME)).passWord(this.state.getProp(JDBC_PUBLISHER_PASSWORD)).cryptoKeyLocation(this.state.getProp(JDBC_PUBLISHER_ENCRYPTION_KEY_LOC)).maxActiveConnections(1).state(this.state).build();
        try {
            return dataSource.getConnection();
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public void close() throws IOException {
    }

    public void initialize() throws IOException {
    }

    public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
        LOG.info("Start publishing data");
        int branches = this.state.getPropAsInt("fork.branches", 1);
        HashSet emptiedDestTables = Sets.newHashSet();
        Connection conn = this.createConnection();
        JdbcWriterCommands commands = this.jdbcWriterCommandsFactory.newInstance(this.state, conn);
        try {
            conn.setAutoCommit(false);
            for (int i = 0; i < branches; ++i) {
                String destinationTable = this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch((String)JDBC_PUBLISHER_FINAL_TABLE_NAME, (int)branches, (int)i));
                String databaseName = this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch((String)JDBC_PUBLISHER_DATABASE_NAME, (int)branches, (int)i));
                Preconditions.checkNotNull((Object)destinationTable);
                if (this.state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch((String)JDBC_PUBLISHER_REPLACE_FINAL_TABLE, (int)branches, (int)i), false) && !emptiedDestTables.contains(destinationTable)) {
                    LOG.info("Deleting table " + destinationTable);
                    commands.deleteAll(databaseName, destinationTable);
                    emptiedDestTables.add(destinationTable);
                }
                Map<String, List<WorkUnitState>> stagingTables = JdbcPublisher.getStagingTables(states, branches, i);
                for (Map.Entry<String, List<WorkUnitState>> entry : stagingTables.entrySet()) {
                    String stagingTable = entry.getKey();
                    LOG.info("Copying data from staging table " + stagingTable + " into destination table " + destinationTable);
                    commands.copyTable(databaseName, stagingTable, destinationTable);
                    for (WorkUnitState workUnitState : entry.getValue()) {
                        workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
                    }
                }
            }
            LOG.info("Commit publish data");
            conn.commit();
        }
        catch (Exception e) {
            try {
                LOG.error("Failed publishing. Rolling back.");
                conn.rollback();
            }
            catch (SQLException se) {
                LOG.error("Failed rolling back.", (Throwable)se);
            }
            throw new RuntimeException("Failed publishing", e);
        }
        finally {
            try {
                conn.close();
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static Map<String, List<WorkUnitState>> getStagingTables(Collection<? extends WorkUnitState> states, int branches, int i) {
        HashMap stagingTables = Maps.newHashMap();
        for (WorkUnitState workUnitState : states) {
            String stagingTableKey;
            String stagingTable = (String)Preconditions.checkNotNull((Object)workUnitState.getProp(stagingTableKey = ForkOperatorUtils.getPropertyNameForBranch((String)"writer.staging.table", (int)branches, (int)i)));
            List existing = (List)stagingTables.get(stagingTable);
            if (existing == null) {
                existing = Lists.newArrayList();
                stagingTables.put(stagingTable, existing);
            }
            existing.add(workUnitState);
        }
        return stagingTables;
    }

    public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException {
    }
}

