/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MigrationManager
implements IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
    private static Map<InetAddress, UUID> lastPushed = new MapMaker().expiration(1L, TimeUnit.MINUTES).makeMap();

    @Override
    public void onJoin(InetAddress endpoint, EndpointState epState) {
    }

    @Override
    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {
        if (state != ApplicationState.SCHEMA) {
            return;
        }
        UUID theirVersion = UUID.fromString(value.value);
        MigrationManager.rectify(theirVersion, endpoint);
    }

    @Override
    public void onAlive(InetAddress endpoint, EndpointState state) {
        VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
        if (value != null) {
            UUID theirVersion = UUID.fromString(value.value);
            MigrationManager.rectify(theirVersion, endpoint);
        }
    }

    @Override
    public void onDead(InetAddress endpoint, EndpointState state) {
    }

    @Override
    public void onRestart(InetAddress endpoint, EndpointState state) {
    }

    @Override
    public void onRemove(InetAddress endpoint) {
    }

    public static void rectify(UUID theirVersion, InetAddress endpoint) {
        UUID myVersion = DatabaseDescriptor.getDefsVersion();
        if (theirVersion.timestamp() < myVersion.timestamp() && !StorageService.instance.isClientMode()) {
            if (lastPushed.get(endpoint) == null || theirVersion.timestamp() >= lastPushed.get(endpoint).timestamp()) {
                logger.debug("Schema on {} is old. Sending updates since {}", (Object)endpoint, (Object)theirVersion);
                Collection<IColumn> migrations = Migration.getLocalMigrations(theirVersion, myVersion);
                MigrationManager.pushMigrations(endpoint, migrations);
                lastPushed.put(endpoint, TimeUUIDType.instance.compose(((IColumn)Iterables.getLast(migrations)).name()));
            } else {
                logger.debug("Waiting for {} to process migrations up to {} before sending more", (Object)endpoint, (Object)lastPushed.get(endpoint));
            }
        }
    }

    private static void pushMigrations(InetAddress endpoint, Collection<IColumn> migrations) {
        try {
            Message msg = MigrationManager.makeMigrationMessage(migrations, Gossiper.instance.getVersion(endpoint));
            MessagingService.instance().sendOneWay(msg, endpoint);
        }
        catch (IOException ex) {
            throw new IOError(ex);
        }
    }

    public static void announce(IColumn column) {
        Set<IColumn> migrations = Collections.singleton(column);
        for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) {
            MigrationManager.pushMigrations(endpoint, migrations);
        }
    }

    public static void passiveAnnounce(UUID version) {
        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
        logger.debug("Gossiping my schema version " + version);
    }

    public static void applyMigrations(UUID from, UUID to) throws IOException {
        ArrayList updates = new ArrayList();
        Collection<IColumn> migrations = Migration.getLocalMigrations(from, to);
        for (IColumn iColumn : migrations) {
            final Migration migration = Migration.deserialize(iColumn.value(), 2);
            Future<?> update = StageManager.getStage(Stage.MIGRATION).submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        migration.apply();
                    }
                    catch (ConfigurationException ex) {
                        logger.debug("Migration not applied " + ex.getMessage());
                    }
                    catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            });
            updates.add(update);
        }
        for (Future future : updates) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
            catch (ExecutionException e) {
                throw new IOException(e);
            }
        }
        MigrationManager.passiveAnnounce(to);
    }

    private static Message makeMigrationMessage(Collection<IColumn> migrations, int version) throws IOException {
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        DataOutputStream dout = new DataOutputStream(bout);
        dout.writeInt(migrations.size());
        for (IColumn col : migrations) {
            assert (col instanceof Column);
            ByteBufferUtil.writeWithLength(col.name(), dout);
            ByteBufferUtil.writeWithLength(col.value(), dout);
        }
        dout.close();
        byte[] body = bout.toByteArray();
        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE, body, version);
    }

    public static Collection<Column> makeColumns(Message msg) throws IOException {
        ArrayList<Column> cols = new ArrayList<Column>();
        DataInputStream in = new DataInputStream(new ByteArrayInputStream(msg.getMessageBody()));
        int count = in.readInt();
        for (int i = 0; i < count; ++i) {
            byte[] name = new byte[in.readInt()];
            in.readFully(name);
            byte[] value = new byte[in.readInt()];
            in.readFully(value);
            cols.add(new Column(ByteBuffer.wrap(name), ByteBuffer.wrap(value)));
        }
        in.close();
        return cols;
    }
}

