/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.replication;

import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FinishedWorkUpdater
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(FinishedWorkUpdater.class);
    private final AccumuloClient client;

    public FinishedWorkUpdater(AccumuloClient client) {
        this.client = client;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        BatchWriter replBw;
        BatchScanner bs;
        log.debug("Looking for finished replication work");
        if (!ReplicationTable.isOnline((AccumuloClient)this.client)) {
            log.debug("Replication table is not yet online, will retry");
            return;
        }
        try {
            bs = ReplicationTable.getBatchScanner((AccumuloClient)this.client, (int)4);
            replBw = ReplicationTable.getBatchWriter((AccumuloClient)this.client);
        }
        catch (ReplicationTableOfflineException e) {
            log.debug("Table is no longer online, will retry");
            return;
        }
        IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
        bs.addScanIterator(cfg);
        ReplicationSchema.WorkSection.limit((ScannerBase)bs);
        bs.setRanges(Collections.singleton(new Range()));
        try {
            for (Map.Entry serializedRow : bs) {
                SortedMap wholeRow;
                try {
                    wholeRow = WholeRowIterator.decodeRow((Key)((Key)serializedRow.getKey()), (Value)((Value)serializedRow.getValue()));
                }
                catch (IOException e) {
                    log.warn("Could not deserialize whole row with key {}", (Object)((Key)serializedRow.getKey()).toStringNoTruncate(), (Object)e);
                    continue;
                }
                log.debug("Processing work progress for {} with {} columns", (Object)((Key)serializedRow.getKey()).getRow(), (Object)wholeRow.size());
                HashMap<TableId, Long> tableIdToProgress = new HashMap<TableId, Long>();
                boolean error = false;
                Text buffer = new Text();
                for (Map.Entry entry : wholeRow.entrySet()) {
                    Replication.Status status;
                    try {
                        status = Replication.Status.parseFrom((byte[])((Value)entry.getValue()).get());
                    }
                    catch (InvalidProtocolBufferException e) {
                        log.warn("Could not deserialize protobuf for {}", entry.getKey(), (Object)e);
                        error = true;
                        break;
                    }
                    ((Key)entry.getKey()).getColumnQualifier(buffer);
                    ReplicationTarget target = ReplicationTarget.from((Text)buffer);
                    if (!tableIdToProgress.containsKey(target.getSourceTableId())) {
                        tableIdToProgress.put(target.getSourceTableId(), Long.MAX_VALUE);
                    }
                    tableIdToProgress.put(target.getSourceTableId(), Math.min((Long)tableIdToProgress.get(target.getSourceTableId()), status.getBegin()));
                }
                if (error) continue;
                for (Map.Entry entry : tableIdToProgress.entrySet()) {
                    if ((Long)entry.getValue() == 0L) continue;
                    ((Key)serializedRow.getKey()).getRow(buffer);
                    log.debug("For {}, source table ID {} has replicated through {}", new Object[]{((Key)serializedRow.getKey()).getRow(), entry.getKey(), entry.getValue()});
                    Mutation replMutation = new Mutation(buffer);
                    Replication.Status updatedStatus = StatusUtil.replicated((long)((Long)entry.getValue()));
                    Value serializedUpdatedStatus = ProtobufUtil.toValue((GeneratedMessage)updatedStatus);
                    TableId srcTableId = (TableId)entry.getKey();
                    ReplicationSchema.StatusSection.add((Mutation)replMutation, (TableId)srcTableId, (Value)serializedUpdatedStatus);
                    log.debug("Updating replication status entry for {} with {}", (Object)((Key)serializedRow.getKey()).getRow(), (Object)ProtobufUtil.toString((GeneratedMessage)updatedStatus));
                    try {
                        replBw.addMutation(replMutation);
                    }
                    catch (MutationsRejectedException e) {
                        log.error("Error writing mutations to update replication Status messages in StatusSection, will retry", (Throwable)e);
                        log.debug("Finished updating files with completed replication work");
                        bs.close();
                        try {
                            replBw.close();
                        }
                        catch (MutationsRejectedException e2) {
                            log.error("Error writing mutations to update replication Status messages in StatusSection, will retry", (Throwable)e2);
                        }
                        return;
                    }
                }
            }
        }
        finally {
            log.debug("Finished updating files with completed replication work");
            bs.close();
            try {
                replBw.close();
            }
            catch (MutationsRejectedException e) {
                log.error("Error writing mutations to update replication Status messages in StatusSection, will retry", (Throwable)e);
            }
        }
    }
}

