/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DefsTable;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IMessageCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MigrationManager
implements IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
    private static final int MIGRATION_REQUEST_RETRIES = 3;
    private static final ByteBuffer LAST_MIGRATION_KEY = ByteBufferUtil.bytes("Last Migration");

    @Override
    public void onJoin(InetAddress endpoint, EndpointState epState) {
    }

    @Override
    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {
        if (state != ApplicationState.SCHEMA || endpoint.equals(FBUtilities.getBroadcastAddress())) {
            return;
        }
        MigrationManager.rectifySchema(UUID.fromString(value.value), endpoint);
    }

    @Override
    public void onAlive(InetAddress endpoint, EndpointState state) {
        VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
        if (value != null) {
            MigrationManager.rectifySchema(UUID.fromString(value.value), endpoint);
        }
    }

    @Override
    public void onDead(InetAddress endpoint, EndpointState state) {
    }

    @Override
    public void onRestart(InetAddress endpoint, EndpointState state) {
    }

    @Override
    public void onRemove(InetAddress endpoint) {
    }

    private static void rectifySchema(UUID theirVersion, InetAddress endpoint) {
        if (Gossiper.instance.getVersion(endpoint) < 4) {
            return;
        }
        if (Schema.instance.getVersion().equals(theirVersion)) {
            return;
        }
        StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
    }

    public static boolean isReadyForBootstrap() {
        return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
    }

    public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException {
        ksm.validate();
        if (Schema.instance.getTableDefinition(ksm.name) != null) {
            throw new ConfigurationException(String.format("Cannot add already existing keyspace '%s'.", ksm.name));
        }
        MigrationManager.announce(ksm.toSchema(FBUtilities.timestampMicros()));
    }

    public static void announceNewColumnFamily(CFMetaData cfm) throws ConfigurationException {
        cfm.validate();
        KSMetaData ksm = Schema.instance.getTableDefinition(cfm.ksName);
        if (ksm == null) {
            throw new ConfigurationException(String.format("Cannot add column family '%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName));
        }
        if (ksm.cfMetaData().containsKey(cfm.cfName)) {
            throw new ConfigurationException(String.format("Cannot add already existing column family '%s' to keyspace '%s'.", cfm.cfName, cfm.ksName));
        }
        MigrationManager.announce(cfm.toSchema(FBUtilities.timestampMicros()));
    }

    public static void announceKeyspaceUpdate(KSMetaData ksm) throws ConfigurationException {
        ksm.validate();
        KSMetaData oldKsm = Schema.instance.getKSMetaData(ksm.name);
        if (oldKsm == null) {
            throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name));
        }
        MigrationManager.announce(oldKsm.toSchemaUpdate(ksm, FBUtilities.timestampMicros()));
    }

    public static void announceColumnFamilyUpdate(CFMetaData cfm) throws ConfigurationException {
        cfm.validate();
        CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName);
        if (oldCfm == null) {
            throw new ConfigurationException(String.format("Cannot update non existing column family '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName));
        }
        MigrationManager.announce(oldCfm.toSchemaUpdate(cfm, FBUtilities.timestampMicros()));
    }

    public static void announceKeyspaceDrop(String ksName) throws ConfigurationException {
        KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName);
        if (oldKsm == null) {
            throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName));
        }
        MigrationManager.announce(oldKsm.dropFromSchema(FBUtilities.timestampMicros()));
    }

    public static void announceColumnFamilyDrop(String ksName, String cfName) throws ConfigurationException {
        CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName);
        if (oldCfm == null) {
            throw new ConfigurationException(String.format("Cannot drop non existing column family '%s' in keyspace '%s'.", cfName, ksName));
        }
        MigrationManager.announce(oldCfm.dropFromSchema(FBUtilities.timestampMicros()));
    }

    private static void announce(RowMutation schema) {
        FBUtilities.waitOnFuture(MigrationManager.announce(Collections.singletonList(schema)));
    }

    private static void pushSchemaMutation(InetAddress endpoint, Collection<RowMutation> schema) {
        try {
            Message msg = MigrationManager.makeMigrationMessage(schema, Gossiper.instance.getVersion(endpoint));
            MessagingService.instance().sendOneWay(msg, endpoint);
        }
        catch (IOException ex) {
            throw new IOError(ex);
        }
    }

    private static Future<?> announce(final Collection<RowMutation> schema) {
        Future<Object> f = StageManager.getStage(Stage.MIGRATION).submit(new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                DefsTable.mergeSchema(schema);
                return null;
            }
        });
        for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) {
            if (endpoint.equals(FBUtilities.getBroadcastAddress()) || Gossiper.instance.getVersion(endpoint) < 4) continue;
            MigrationManager.pushSchemaMutation(endpoint, schema);
        }
        return f;
    }

    public static void passiveAnnounce(UUID version) {
        assert (Gossiper.instance.isEnabled());
        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version));
        logger.debug("Gossiping my schema version " + version);
    }

    private static Message makeMigrationMessage(Collection<RowMutation> schema, int version) throws IOException {
        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.DEFINITIONS_UPDATE, MigrationManager.serializeSchema(schema, version), version);
    }

    public static byte[] serializeSchema(Collection<RowMutation> schema, int version) throws IOException {
        FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
        DataOutputStream dout = new DataOutputStream(bout);
        dout.writeInt(schema.size());
        for (RowMutation mutation : schema) {
            RowMutation.serializer().serialize(mutation, (DataOutput)dout, version);
        }
        dout.close();
        return bout.toByteArray();
    }

    public static Collection<RowMutation> deserializeMigrationMessage(byte[] data, int version) throws IOException {
        ArrayList<RowMutation> schema = new ArrayList<RowMutation>();
        DataInputStream in = new DataInputStream(new FastByteArrayInputStream(data));
        int count = in.readInt();
        for (int i = 0; i < count; ++i) {
            schema.add(RowMutation.serializer().deserialize(in, version));
        }
        return schema;
    }

    public static void resetLocalSchema() throws IOException {
        logger.info("Starting local schema reset...");
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Truncating schema tables...");
            }
            FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3){
                {
                    SystemTable.schemaCFS("schema_keyspaces").truncate();
                    SystemTable.schemaCFS("schema_columnfamilies").truncate();
                    SystemTable.schemaCFS("schema_columns").truncate();
                }
            });
            if (logger.isDebugEnabled()) {
                logger.debug("Clearing local schema keyspace definitions...");
            }
            Schema.instance.clear();
            Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
            liveEndpoints.remove(FBUtilities.getBroadcastAddress());
            for (InetAddress node : liveEndpoints) {
                if (Gossiper.instance.getVersion(node) < 4) continue;
                if (logger.isDebugEnabled()) {
                    logger.debug("Requesting schema from " + node);
                }
                FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node)));
                break;
            }
            logger.info("Local schema reset is complete.");
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Deprecated
    public static UUID getLastMigrationId() {
        QueryFilter filter;
        DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
        Table defs = Table.open("system");
        ColumnFamilyStore cfStore = defs.getColumnFamilyStore("Schema");
        ColumnFamily cf = cfStore.getColumnFamily(filter = QueryFilter.getNamesFilter(dkey, new QueryPath("Schema"), LAST_MIGRATION_KEY));
        if (cf == null || cf.getColumnNames().size() == 0) {
            return null;
        }
        return UUIDGen.getUUID(cf.getColumn(LAST_MIGRATION_KEY).value());
    }

    static class MigrationTask
    extends WrappedRunnable {
        private final InetAddress endpoint;

        MigrationTask(InetAddress endpoint) {
            this.endpoint = endpoint;
        }

        @Override
        public void runMayThrow() throws Exception {
            Message message = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.MIGRATION_REQUEST, ArrayUtils.EMPTY_BYTE_ARRAY, Gossiper.instance.getVersion(this.endpoint));
            if (!FailureDetector.instance.isAlive(this.endpoint)) {
                logger.error("Can't send migration request: node {} is down.", (Object)this.endpoint);
                return;
            }
            IAsyncCallback cb = new IAsyncCallback(){

                @Override
                public void response(Message message) {
                    try {
                        DefsTable.mergeRemoteSchema(message.getMessageBody(), message.getVersion());
                    }
                    catch (IOException e) {
                        logger.error("IOException merging remote schema", (Throwable)e);
                    }
                    catch (ConfigurationException e) {
                        logger.error("Configuration exception merging remote schema", (Throwable)e);
                    }
                }

                @Override
                public boolean isLatencyForSnitch() {
                    return false;
                }
            };
            MessagingService.instance().sendRR(message, this.endpoint, (IMessageCallback)cb);
        }
    }
}

