/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.spring.data.connection;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import org.redisson.spring.data.connection.RedissonBaseReactive;
import org.springframework.data.redis.connection.ReactiveSubscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class RedissonReactiveSubscription
implements ReactiveSubscription {
    private final Map<ByteBuffer, PubSubConnectionEntry> channels = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
    private final Map<ByteBuffer, PubSubConnectionEntry> patterns = new ConcurrentHashMap<ByteBuffer, PubSubConnectionEntry>();
    private final PublishSubscribeService subscribeService;
    private final AtomicReference<Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>>> flux = new AtomicReference();
    private volatile Disposable disposable;

    public RedissonReactiveSubscription(ConnectionManager connectionManager) {
        this.subscribeService = connectionManager.getSubscribeService();
    }

    public Mono<Void> subscribe(ByteBuffer ... channels) {
        RedissonPromise result = new RedissonPromise();
        CountableListener listener = new CountableListener((RPromise)result, null, channels.length);
        for (ByteBuffer channel : channels) {
            RFuture f = this.subscribeService.subscribe((Codec)ByteArrayCodec.INSTANCE, this.toChannelName(channel), new RedisPubSubListener[0]);
            f.onComplete((res, e) -> this.channels.put(channel, (PubSubConnectionEntry)res));
            f.onComplete((BiConsumer)listener);
        }
        return Mono.fromFuture((CompletableFuture)result);
    }

    protected ChannelName toChannelName(ByteBuffer channel) {
        return new ChannelName(RedissonBaseReactive.toByteArray(channel));
    }

    public Mono<Void> pSubscribe(ByteBuffer ... patterns) {
        RedissonPromise result = new RedissonPromise();
        CountableListener listener = new CountableListener((RPromise)result, null, patterns.length);
        for (ByteBuffer channel : patterns) {
            RFuture f = this.subscribeService.psubscribe(this.toChannelName(channel), (Codec)ByteArrayCodec.INSTANCE, new RedisPubSubListener[0]);
            f.onComplete((res, e) -> this.patterns.put(channel, (PubSubConnectionEntry)res));
            f.onComplete((BiConsumer)listener);
        }
        return Mono.fromFuture((CompletableFuture)result);
    }

    public Mono<Void> unsubscribe() {
        return this.unsubscribe(this.channels.keySet().toArray(new ByteBuffer[this.channels.size()]));
    }

    public Mono<Void> unsubscribe(ByteBuffer ... channels) {
        RedissonPromise result = new RedissonPromise();
        CountableListener listener = new CountableListener((RPromise)result, null, channels.length);
        for (ByteBuffer channel : channels) {
            RFuture f = this.subscribeService.unsubscribe(this.toChannelName(channel), PubSubType.UNSUBSCRIBE);
            f.onComplete((BiConsumer)listener);
        }
        return Mono.fromFuture((CompletableFuture)result);
    }

    public Mono<Void> pUnsubscribe() {
        return this.unsubscribe(this.patterns.keySet().toArray(new ByteBuffer[this.patterns.size()]));
    }

    public Mono<Void> pUnsubscribe(ByteBuffer ... patterns) {
        RedissonPromise result = new RedissonPromise();
        CountableListener listener = new CountableListener((RPromise)result, null, patterns.length);
        for (ByteBuffer channel : patterns) {
            RFuture f = this.subscribeService.unsubscribe(this.toChannelName(channel), PubSubType.PUNSUBSCRIBE);
            f.onComplete((BiConsumer)listener);
        }
        return Mono.fromFuture((CompletableFuture)result);
    }

    public Set<ByteBuffer> getChannels() {
        return this.channels.keySet();
    }

    public Set<ByteBuffer> getPatterns() {
        return this.patterns.keySet();
    }

    public Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>> receive() {
        if (this.flux.get() != null) {
            return this.flux.get();
        }
        Flux f = Flux.create(emitter -> emitter.onRequest(n -> {
            AtomicLong counter = new AtomicLong(n);
            BaseRedisPubSubListener listener = new BaseRedisPubSubListener((FluxSink)emitter, counter){
                final /* synthetic */ FluxSink val$emitter;
                final /* synthetic */ AtomicLong val$counter;
                {
                    this.val$emitter = fluxSink;
                    this.val$counter = atomicLong;
                }

                public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) {
                    this.val$emitter.next((Object)new ReactiveSubscription.PatternMessage((Object)ByteBuffer.wrap(pattern.toString().getBytes()), (Object)ByteBuffer.wrap(channel.toString().getBytes()), (Object)ByteBuffer.wrap((byte[])message)));
                    if (this.val$counter.decrementAndGet() == 0L) {
                        RedissonReactiveSubscription.this.disposable.dispose();
                        this.val$emitter.complete();
                    }
                }

                public void onMessage(CharSequence channel, Object msg) {
                    this.val$emitter.next((Object)new ReactiveSubscription.ChannelMessage((Object)ByteBuffer.wrap(channel.toString().getBytes()), (Object)ByteBuffer.wrap((byte[])msg)));
                    if (this.val$counter.decrementAndGet() == 0L) {
                        RedissonReactiveSubscription.this.disposable.dispose();
                        this.val$emitter.complete();
                    }
                }
            };
            this.disposable = () -> {
                for (Map.Entry<ByteBuffer, PubSubConnectionEntry> entry : this.channels.entrySet()) {
                    entry.getValue().removeListener(this.toChannelName(entry.getKey()), (RedisPubSubListener)listener);
                }
                for (Map.Entry<ByteBuffer, PubSubConnectionEntry> entry : this.patterns.entrySet()) {
                    entry.getValue().removeListener(this.toChannelName(entry.getKey()), (RedisPubSubListener)listener);
                }
            };
            for (Map.Entry<ByteBuffer, PubSubConnectionEntry> entry : this.channels.entrySet()) {
                entry.getValue().addListener(this.toChannelName(entry.getKey()), (RedisPubSubListener)listener);
            }
            for (Map.Entry<ByteBuffer, PubSubConnectionEntry> entry : this.patterns.entrySet()) {
                entry.getValue().addListener(this.toChannelName(entry.getKey()), (RedisPubSubListener)listener);
            }
            emitter.onDispose(this.disposable);
        }));
        if (this.flux.compareAndSet(null, (Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>>)f)) {
            return f;
        }
        return this.flux.get();
    }

    public Mono<Void> cancel() {
        return this.unsubscribe().then(this.pUnsubscribe()).then(Mono.fromRunnable(() -> {
            if (this.disposable != null) {
                this.disposable.dispose();
            }
        }));
    }
}

