/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.spring.data.connection;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.spring.data.connection.RedissonBaseReactive;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedissonReactiveStreamCommands
extends RedissonBaseReactive
implements ReactiveStreamCommands {
    private static final RedisStrictCommand<String> XGROUP_STRING = new RedisStrictCommand("XGROUP");

    RedissonReactiveStreamCommands(CommandReactiveExecutor executorService) {
        super(executorService);
    }

    private static List<String> toStringList(List<RecordId> recordIds) {
        return recordIds.stream().map(RecordId::getValue).collect(Collectors.toList());
    }

    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>> xAck(Publisher<ReactiveStreamCommands.AcknowledgeCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getGroup(), (String)"Group must not be null!");
            Assert.notNull((Object)command.getRecordIds(), (String)"recordIds must not be null!");
            ArrayList<Object> params = new ArrayList<Object>();
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            params.add(k);
            params.add(command.getGroup());
            params.addAll(RedissonReactiveStreamCommands.toStringList(command.getRecordIds()));
            Mono m = this.write(k, (Codec)StringCodec.INSTANCE, (RedisCommand<?>)RedisCommands.XACK, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.NumericResponse(command, (Number)v));
        });
    }

    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>> xAdd(Publisher<ReactiveStreamCommands.AddStreamRecord> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getBody(), (String)"Body must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            LinkedList<Object> params = new LinkedList<Object>();
            params.add(k);
            if (!command.getRecord().getId().shouldBeAutoGenerated()) {
                params.add(command.getRecord().getId().getValue());
            } else {
                params.add("*");
            }
            for (Map.Entry entry : command.getBody().entrySet()) {
                params.add(RedissonReactiveStreamCommands.toByteArray((ByteBuffer)entry.getKey()));
                params.add(RedissonReactiveStreamCommands.toByteArray((ByteBuffer)entry.getValue()));
            }
            Mono m = this.write(k, (Codec)StringCodec.INSTANCE, (RedisCommand<?>)RedisCommands.XADD, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse(command, (Object)RecordId.of((String)v.toString())));
        });
    }

    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.DeleteCommand, Long>> xDel(Publisher<ReactiveStreamCommands.DeleteCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getRecordIds(), (String)"recordIds must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            ArrayList<Object> params = new ArrayList<Object>();
            params.add(k);
            params.addAll(RedissonReactiveStreamCommands.toStringList(command.getRecordIds()));
            Mono m = this.write(k, (Codec)StringCodec.INSTANCE, (RedisCommand<?>)RedisCommands.XDEL, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse(command, v));
        });
    }

    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xLen(Publisher<ReactiveRedisConnection.KeyCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            Mono m = this.write(k, (Codec)StringCodec.INSTANCE, (RedisCommand<?>)RedisCommands.XLEN, new Object[]{k});
            return m.map(v -> new ReactiveRedisConnection.NumericResponse(command, (Number)v));
        });
    }

    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return this.range(RedisCommands.XRANGE, publisher);
    }

    private Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> range(RedisCommand<?> rangeCommand, Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getRange(), (String)"Range must not be null!");
            Assert.notNull((Object)command.getLimit(), (String)"Limit must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            LinkedList<Object> params = new LinkedList<Object>();
            params.add(k);
            if (rangeCommand == RedisCommands.XRANGE) {
                params.add(command.getRange().getLowerBound().getValue().orElse("-"));
                params.add(command.getRange().getUpperBound().getValue().orElse("+"));
            } else {
                params.add(command.getRange().getUpperBound().getValue().orElse("+"));
                params.add(command.getRange().getLowerBound().getValue().orElse("-"));
            }
            if (command.getLimit().getCount() > 0) {
                params.add("COUNT");
                params.add(command.getLimit().getCount());
            }
            Mono m = this.write(k, (Codec)ByteArrayCodec.INSTANCE, rangeCommand, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse(command, (Object)Flux.fromStream(v.entrySet().stream()).map(e -> {
                Map<ByteBuffer, ByteBuffer> map = ((Map)e.getValue()).entrySet().stream().collect(Collectors.toMap(entry -> ByteBuffer.wrap((byte[])entry.getKey()), entry -> ByteBuffer.wrap((byte[])entry.getValue())));
                return StreamRecords.newRecord().in((Object)command.getKey()).withId(RecordId.of((String)((StreamMessageId)e.getKey()).toString())).ofBuffer(map);
            })));
        });
    }

    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReactiveStreamCommands.ReadCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getStreamOffsets(), (String)"StreamOffsets must not be null!");
            Assert.notNull((Object)command.getReadOptions(), (String)"ReadOptions must not be null!");
            ArrayList<Object> params = new ArrayList<Object>();
            if (command.getConsumer() != null) {
                params.add("GROUP");
                params.add(command.getConsumer().getGroup());
                params.add(command.getConsumer().getName());
            }
            if (command.getReadOptions().getCount() != null && command.getReadOptions().getCount() > 0L) {
                params.add("COUNT");
                params.add(command.getReadOptions().getCount());
            }
            if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0L) {
                params.add("BLOCK");
                params.add(command.getReadOptions().getBlock());
            }
            params.add("STREAMS");
            for (StreamOffset streamOffset : command.getStreamOffsets()) {
                params.add(RedissonReactiveStreamCommands.toByteArray((ByteBuffer)streamOffset.getKey()));
            }
            for (StreamOffset streamOffset : command.getStreamOffsets()) {
                params.add(streamOffset.getOffset().getOffset());
            }
            Mono m = command.getConsumer() == null ? (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0L ? this.read(RedissonReactiveStreamCommands.toByteArray((ByteBuffer)((StreamOffset)command.getStreamOffsets().get(0)).getKey()), (Codec)ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray()) : this.read(RedissonReactiveStreamCommands.toByteArray((ByteBuffer)((StreamOffset)command.getStreamOffsets().get(0)).getKey()), (Codec)ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray())) : (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0L ? this.read(RedissonReactiveStreamCommands.toByteArray((ByteBuffer)((StreamOffset)command.getStreamOffsets().get(0)).getKey()), (Codec)ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray()) : this.read(RedissonReactiveStreamCommands.toByteArray((ByteBuffer)((StreamOffset)command.getStreamOffsets().get(0)).getKey()), (Codec)ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray()));
            return m.map(v -> new ReactiveRedisConnection.CommandResponse(command, (Object)Flux.fromStream(v.entrySet().stream()).map(ee -> ((Map)ee.getValue()).entrySet().stream().map(e -> {
                Map<ByteBuffer, ByteBuffer> map = ((Map)e.getValue()).entrySet().stream().collect(Collectors.toMap(entry -> ByteBuffer.wrap((byte[])entry.getKey()), entry -> ByteBuffer.wrap((byte[])entry.getValue())));
                return StreamRecords.newRecord().in(ee.getKey()).withId(RecordId.of((String)((StreamMessageId)e.getKey()).toString())).ofBuffer(map);
            })).flatMap(Flux::fromStream)));
        });
    }

    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>> xGroup(Publisher<ReactiveStreamCommands.GroupCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getGroupName(), (String)"GroupName must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.CREATE)) {
                Assert.notNull((Object)command.getReadOffset(), (String)"ReadOffset must not be null!");
                Mono m = this.write(k, (Codec)StringCodec.INSTANCE, (RedisCommand<?>)XGROUP_STRING, "CREATE", k, command.getGroupName(), command.getReadOffset().getOffset(), "MKSTREAM");
                return m.map(v -> new ReactiveRedisConnection.CommandResponse(command, v));
            }
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.DELETE_CONSUMER)) {
                Assert.notNull((Object)command.getConsumerName(), (String)"ConsumerName must not be null!");
                Mono m = this.write(k, (Codec)StringCodec.INSTANCE, (RedisCommand<?>)RedisCommands.XGROUP_LONG, "DELCONSUMER", k, command.getGroupName(), command.getConsumerName());
                return m.map(v -> new ReactiveRedisConnection.CommandResponse(command, (Object)(v > 0L ? "OK" : "Error")));
            }
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.DESTROY)) {
                Mono m = this.write(k, (Codec)StringCodec.INSTANCE, (RedisCommand<?>)RedisCommands.XGROUP_LONG, "DESTROY", k, command.getGroupName());
                return m.map(v -> new ReactiveRedisConnection.CommandResponse(command, (Object)(v > 0L ? "OK" : "Error")));
            }
            throw new IllegalArgumentException("unknown command " + command.getAction());
        });
    }

    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return this.range(RedisCommands.XREVRANGE, publisher);
    }

    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xTrim(Publisher<ReactiveStreamCommands.TrimCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), (String)"Key must not be null!");
            Assert.notNull((Object)command.getCount(), (String)"Count must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            Mono m = this.write(k, (Codec)StringCodec.INSTANCE, (RedisCommand<?>)RedisCommands.XTRIM, k, "MAXLEN", command.getCount());
            return m.map(v -> new ReactiveRedisConnection.NumericResponse(command, (Number)v));
        });
    }
}

