/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint.dcp;

import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.AbstractEndpoint;
import com.couchbase.client.core.endpoint.AbstractGenericHandler;
import com.couchbase.client.core.endpoint.ResponseStatusConverter;
import com.couchbase.client.core.endpoint.dcp.DCPStream;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.dcp.AbstractDCPRequest;
import com.couchbase.client.core.message.dcp.AbstractDCPResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.FailoverLogEntry;
import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.RemoveMessage;
import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action1;

public class DCPHandler
extends AbstractGenericHandler<FullBinaryMemcacheResponse, BinaryMemcacheRequest, DCPRequest> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DCPHandler.class);
    public static final byte OP_OPEN_CONNECTION = 80;
    public static final byte OP_STREAM_REQUEST = 83;
    public static final byte OP_SNAPSHOT_MARKER = 86;
    public static final byte OP_MUTATION = 87;
    public static final byte OP_REMOVE = 88;
    private final Map<Integer, DCPStream> streams = new HashMap<Integer, DCPStream>();
    private int nextStreamId = 0;

    public DCPHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, boolean isTransient) {
        this(endpoint, responseBuffer, (Queue<DCPRequest>)new ArrayDeque<DCPRequest>(), isTransient);
    }

    public DCPHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, Queue<DCPRequest> queue, boolean isTransient) {
        super(endpoint, responseBuffer, queue, isTransient);
    }

    @Override
    protected BinaryMemcacheRequest encodeRequest(ChannelHandlerContext ctx, DCPRequest msg) throws Exception {
        BinaryMemcacheRequest request;
        if (msg instanceof OpenConnectionRequest) {
            request = this.handleOpenConnectionRequest(ctx, (OpenConnectionRequest)msg);
        } else if (msg instanceof StreamRequestRequest) {
            request = this.handleStreamRequestRequest(ctx, (StreamRequestRequest)msg);
        } else {
            throw new IllegalArgumentException("Unknown incoming DCPRequest type " + msg.getClass());
        }
        if (msg.partition() >= 0) {
            request.setReserved(msg.partition());
        }
        return request;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected CouchbaseResponse decodeResponse(ChannelHandlerContext ctx, FullBinaryMemcacheResponse msg) throws Exception {
        DCPRequest request = (DCPRequest)this.currentRequest();
        AbstractDCPResponse response = null;
        if (msg.getOpcode() == 80 && request instanceof OpenConnectionRequest) {
            response = new OpenConnectionResponse(ResponseStatusConverter.fromBinary(msg.getStatus()), request);
        } else if (msg.getOpcode() == 83 && request instanceof StreamRequestRequest) {
            ByteBuf content = msg.content();
            Scheduler scheduler = this.env().scheduler();
            DCPStream stream = this.streams.get(msg.getOpaque());
            ArrayList<FailoverLogEntry> failoverLog = new ArrayList<FailoverLogEntry>(content.readableBytes() / 16);
            while (content.readableBytes() >= 16) {
                FailoverLogEntry entry = new FailoverLogEntry(content.readLong(), content.readLong());
                failoverLog.add(entry);
            }
            response = new StreamRequestResponse(ResponseStatusConverter.fromBinary(msg.getStatus()), (Observable<DCPRequest>)stream.subject().onBackpressureBuffer().observeOn(scheduler), failoverLog, request);
        } else {
            DCPRequest oldRequest = (DCPRequest)this.currentRequest();
            final DCPStream stream = this.streams.get(msg.getOpaque());
            AbstractDCPRequest dummy = new AbstractDCPRequest(stream.bucket(), null){};
            dummy.observable().subscribe((Action1)new Action1<CouchbaseResponse>(){

                public void call(CouchbaseResponse couchbaseResponse) {
                }
            }, (Action1)new Action1<Throwable>(){

                public void call(Throwable throwable) {
                    stream.subject().onError(throwable);
                }
            });
            try {
                this.currentRequest(dummy);
                this.handleDCPRequest(ctx, msg);
            }
            finally {
                this.currentRequest(oldRequest);
            }
        }
        if (response != null || request == null) {
            this.finishedDecoding();
        }
        return response;
    }

    private void handleDCPRequest(ChannelHandlerContext ctx, FullBinaryMemcacheResponse msg) {
        DCPStream stream = this.streams.get(msg.getOpaque());
        AbstractDCPRequest request = null;
        int flags = 0;
        switch (msg.getOpcode()) {
            case 86: {
                long startSequenceNumber = 0L;
                long endSequenceNumber = 0L;
                if (msg.getExtrasLength() > 0) {
                    ByteBuf extrasReleased = msg.getExtras();
                    ByteBuf extras = ctx.alloc().buffer(msg.getExtrasLength());
                    extras.writeBytes(extrasReleased, extrasReleased.readerIndex(), extrasReleased.readableBytes());
                    startSequenceNumber = extras.readLong();
                    endSequenceNumber = extras.readLong();
                    flags = extras.readInt();
                    extras.release();
                }
                request = new SnapshotMarkerMessage(msg.getStatus(), startSequenceNumber, endSequenceNumber, flags, stream.bucket());
                break;
            }
            case 87: {
                int expiration = 0;
                int lockTime = 0;
                if (msg.getExtrasLength() > 0) {
                    ByteBuf extrasReleased = msg.getExtras();
                    ByteBuf extras = ctx.alloc().buffer(msg.getExtrasLength());
                    extras.writeBytes(extrasReleased, extrasReleased.readerIndex(), extrasReleased.readableBytes());
                    extras.skipBytes(16);
                    flags = extras.readInt();
                    expiration = extras.readInt();
                    lockTime = extras.readInt();
                    extras.release();
                }
                request = new MutationMessage(msg.getStatus(), msg.getKey(), msg.content().retain(), expiration, flags, lockTime, msg.getCAS(), stream.bucket());
                break;
            }
            case 88: {
                request = new RemoveMessage(msg.getStatus(), msg.getKey(), msg.getCAS(), stream.bucket());
                break;
            }
            default: {
                LOGGER.info("Unhandled DCP message: {}, {}", (Object)msg.getOpcode(), (Object)msg);
            }
        }
        if (request != null) {
            stream.subject().onNext((Object)request);
        }
    }

    private BinaryMemcacheRequest handleOpenConnectionRequest(ChannelHandlerContext ctx, OpenConnectionRequest msg) {
        ByteBuf extras = ctx.alloc().buffer(8);
        extras.writeInt(msg.sequenceNumber()).writeInt(msg.type().flags());
        String key = msg.connectionName();
        byte extrasLength = (byte)extras.readableBytes();
        short keyLength = (short)key.getBytes(CharsetUtil.UTF_8).length;
        DefaultBinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(key, extras);
        request.setOpcode((byte)80);
        request.setKeyLength(keyLength);
        request.setExtrasLength(extrasLength);
        request.setTotalBodyLength(keyLength + extrasLength);
        return request;
    }

    private BinaryMemcacheRequest handleStreamRequestRequest(ChannelHandlerContext ctx, StreamRequestRequest msg) {
        ByteBuf extras = ctx.alloc().buffer(48);
        extras.writeInt(0).writeInt(0).writeLong(msg.startSequenceNumber()).writeLong(msg.endSequenceNumber()).writeLong(msg.vbucketUUID()).writeLong(msg.snapshotStartSequenceNumber()).writeLong(msg.snapshotEndSequenceNumber());
        byte extrasLength = (byte)extras.readableBytes();
        DefaultBinaryMemcacheRequest request = new DefaultBinaryMemcacheRequest(extras);
        request.setOpcode((byte)83);
        request.setExtrasLength(extrasLength);
        request.setTotalBodyLength(extrasLength);
        request.setOpaque(this.initializeUniqueStream(msg.bucket()));
        return request;
    }

    private int initializeUniqueStream(String bucket) {
        int streamId = this.nextStreamId++;
        DCPStream stream = new DCPStream(streamId, bucket);
        this.streams.put(streamId, stream);
        return streamId;
    }
}

