/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.compaction.action;

import com.google.common.base.Optional;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.time.TimeIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactionWatermarkAction
implements CompactionCompleteAction<FileSystemDataset> {
    private static final Logger log = LoggerFactory.getLogger(CompactionWatermarkAction.class);
    public static final String CONF_PREFIX = "compactionWatermarkAction";
    public static final String GRANULARITY = "compactionWatermarkAction.granularity";
    public static final String DEFAULT_HIVE_DB = "compactionWatermarkAction.defaultHiveDb";
    private EventSubmitter submitter;
    private State state;
    private final String defaultHiveDb;
    private final TimeIterator.Granularity granularity;
    private final ZoneId zone;

    public CompactionWatermarkAction(State state) {
        this.state = state;
        this.defaultHiveDb = state.getProp(DEFAULT_HIVE_DB);
        this.granularity = TimeIterator.Granularity.valueOf((String)state.getProp(GRANULARITY).toUpperCase());
        this.zone = ZoneId.of(state.getProp("compaction.timezone", "America/Los_Angeles"));
    }

    @Override
    public void onCompactionJobComplete(FileSystemDataset dataset) throws IOException {
        String compactionWatermark = this.state.getProp("compactionWatermark");
        String completeCompactionWatermark = this.state.getProp("completionAndCompactionWatermark");
        if (StringUtils.isEmpty((String)compactionWatermark) && StringUtils.isEmpty((String)completeCompactionWatermark)) {
            return;
        }
        CompactionPathParser.CompactionParserResult result = new CompactionPathParser(this.state).parse(dataset);
        HiveDatasetFinder.DbAndTable dbAndTable = this.extractDbTable(result.getDatasetName());
        String hiveDb = dbAndTable.getDb();
        String hiveTable = dbAndTable.getTable();
        HiveRegister hiveRegister = HiveRegister.get((State)this.state);
        Optional tableOptional = hiveRegister.getTable(hiveDb, hiveTable);
        if (!tableOptional.isPresent()) {
            log.info("Table {}.{} not found. Skip publishing compaction watermarks", (Object)hiveDb, (Object)hiveTable);
            return;
        }
        HiveTable table = (HiveTable)tableOptional.get();
        State tableProps = table.getProps();
        boolean shouldUpdate = this.mayUpdateWatermark(dataset, tableProps, "compactionWatermark", compactionWatermark);
        if (this.mayUpdateWatermark(dataset, tableProps, "completionAndCompactionWatermark", completeCompactionWatermark)) {
            shouldUpdate = true;
        }
        if (shouldUpdate) {
            log.info("Alter table {}.{} to publish watermarks {}", new Object[]{hiveDb, hiveTable, tableProps});
            hiveRegister.alterTable(table);
        }
    }

    private boolean mayUpdateWatermark(FileSystemDataset dataset, State props, String key, String newValue) {
        if (StringUtils.isEmpty((String)newValue)) {
            return false;
        }
        long existing = props.getPropAsLong(key, 0L);
        if (existing == 0L) {
            props.setProp(key, (Object)newValue);
            return true;
        }
        long actualNextWatermark = Long.parseLong(newValue);
        if (actualNextWatermark <= existing) {
            return false;
        }
        long expectedWatermark = this.getExpectedNextWatermark(existing);
        if (actualNextWatermark != expectedWatermark) {
            String errMsg = String.format("Fail to advance %s of dataset %s: expect %s but got %s, please manually fill the gap and rerun.", key, dataset.datasetRoot(), expectedWatermark, actualNextWatermark);
            log.error(errMsg);
            throw new RuntimeException(errMsg);
        }
        props.setProp(key, (Object)newValue);
        return true;
    }

    private long getExpectedNextWatermark(Long previousWatermark) {
        ZonedDateTime previousWatermarkTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark), this.zone);
        ZonedDateTime nextWatermarkTime = TimeIterator.inc((ZonedDateTime)previousWatermarkTime, (TimeIterator.Granularity)this.granularity, (long)1L);
        return nextWatermarkTime.toInstant().toEpochMilli();
    }

    @Override
    public void addEventSubmitter(EventSubmitter submitter) {
        this.submitter = submitter;
    }

    private HiveDatasetFinder.DbAndTable extractDbTable(String datasetName) {
        String[] parts = datasetName.split("/");
        if (parts.length == 0 || parts.length > 2) {
            throw new RuntimeException(String.format("Unsupported dataset %s", datasetName));
        }
        String hiveDb = this.defaultHiveDb;
        String hiveTable = parts[0];
        if (parts.length == 2) {
            hiveDb = parts[0];
            hiveTable = parts[1];
        }
        return new HiveDatasetFinder.DbAndTable(hiveDb, hiveTable);
    }
}

