/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.lib.db;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Public
@InterfaceStability.Stable
public class DBOutputFormat<K extends DBWritable, V>
extends OutputFormat<K, V> {
    private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
    public String dbProductName = "DEFAULT";

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
    }

    public String constructQuery(String table, String[] fieldNames) {
        int i;
        if (fieldNames == null) {
            throw new IllegalArgumentException("Field names may not be null");
        }
        StringBuilder query = new StringBuilder();
        query.append("INSERT INTO ").append(table);
        if (fieldNames.length > 0 && fieldNames[0] != null) {
            query.append(" (");
            for (i = 0; i < fieldNames.length; ++i) {
                query.append(fieldNames[i]);
                if (i == fieldNames.length - 1) continue;
                query.append(",");
            }
            query.append(")");
        }
        query.append(" VALUES (");
        for (i = 0; i < fieldNames.length; ++i) {
            query.append("?");
            if (i == fieldNames.length - 1) continue;
            query.append(",");
        }
        if (this.dbProductName.startsWith("DB2") || this.dbProductName.startsWith("ORACLE")) {
            query.append(")");
        } else {
            query.append(");");
        }
        return query.toString();
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException {
        DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
        String tableName = dbConf.getOutputTableName();
        String[] fieldNames = dbConf.getOutputFieldNames();
        if (fieldNames == null) {
            fieldNames = new String[dbConf.getOutputFieldCount()];
        }
        try {
            Connection connection = dbConf.getConnection();
            PreparedStatement statement = null;
            DatabaseMetaData dbMeta = connection.getMetaData();
            this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
            statement = connection.prepareStatement(this.constructQuery(tableName, fieldNames));
            return new DBRecordWriter(connection, statement);
        }
        catch (Exception ex) {
            throw new IOException(ex.getMessage());
        }
    }

    public static void setOutput(Job job, String tableName, String ... fieldNames) throws IOException {
        if (fieldNames.length > 0 && fieldNames[0] != null) {
            DBConfiguration dbConf = DBOutputFormat.setOutput(job, tableName);
            dbConf.setOutputFieldNames(fieldNames);
        } else if (fieldNames.length > 0) {
            DBOutputFormat.setOutput(job, tableName, fieldNames.length);
        } else {
            throw new IllegalArgumentException("Field names must be greater than 0");
        }
    }

    public static void setOutput(Job job, String tableName, int fieldCount) throws IOException {
        DBConfiguration dbConf = DBOutputFormat.setOutput(job, tableName);
        dbConf.setOutputFieldCount(fieldCount);
    }

    private static DBConfiguration setOutput(Job job, String tableName) throws IOException {
        job.setOutputFormatClass(DBOutputFormat.class);
        job.setReduceSpeculativeExecution(false);
        DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
        dbConf.setOutputTableName(tableName);
        return dbConf;
    }

    @InterfaceStability.Evolving
    public class DBRecordWriter
    extends RecordWriter<K, V> {
        private Connection connection;
        private PreparedStatement statement;

        public DBRecordWriter() throws SQLException {
        }

        public DBRecordWriter(Connection connection, PreparedStatement statement) throws SQLException {
            this.connection = connection;
            this.statement = statement;
            this.connection.setAutoCommit(false);
        }

        public Connection getConnection() {
            return this.connection;
        }

        public PreparedStatement getStatement() {
            return this.statement;
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException {
            try {
                this.statement.executeBatch();
                this.connection.commit();
            }
            catch (SQLException e) {
                try {
                    this.connection.rollback();
                }
                catch (SQLException ex) {
                    LOG.warn(StringUtils.stringifyException((Throwable)ex));
                }
                throw new IOException(e.getMessage());
            }
            finally {
                try {
                    this.statement.close();
                    this.connection.close();
                }
                catch (SQLException ex) {
                    throw new IOException(ex.getMessage());
                }
            }
        }

        @Override
        public void write(K key, V value) throws IOException {
            try {
                key.write(this.statement);
                this.statement.addBatch();
            }
            catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

