/*
 * Decompiled with CFR 0.152.
 */
package tr.com.infumia.infumialib.redis;

import com.google.protobuf.GeneratedMessageV3;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.jetbrains.annotations.NotNull;
import tr.com.infumia.infumialib.misc.Protobuf;
import tr.com.infumia.infumialib.proto.Definition;
import tr.com.infumia.infumialib.redis.Redis;
import tr.com.infumia.infumialib.redis.Subscription;
import tr.com.infumia.infumialib.registries.Registry;

public final class PubSub {
    private final Registry<String, Subscription<?>> subscribes = new Registry();
    private final byte @NotNull [] topic;
    @NotNull
    private Predicate<byte[]> filterChannel = bytes -> true;

    private PubSub(byte @NotNull [] topic) {
        this.topic = (byte[])topic.clone();
    }

    @NotNull
    public static PubSub init(byte[] topic) {
        PubSub pubSub = new PubSub(topic);
        pubSub.connect();
        return pubSub;
    }

    public void connect() {
        StatefulRedisPubSubConnection connection = Redis.get().connectPubSub((RedisCodec)ByteArrayCodec.INSTANCE);
        connection.addListener((RedisPubSubListener)new Listener(this));
        connection.async().subscribe((Object[])new byte[][]{this.topic});
    }

    public void send(@NotNull GeneratedMessageV3 message) {
        Definition.ServerMessage serverMessage = Protobuf.createServerMessage(message);
        try (StatefulRedisPubSubConnection connection = Redis.get().connectPubSub((RedisCodec)ByteArrayCodec.INSTANCE);){
            connection.async().publish((Object)this.topic, (Object)serverMessage.toByteArray());
        }
    }

    public <T extends GeneratedMessageV3> void subscribe(@NotNull T template, @NotNull BiConsumer<Definition.ServerMessage, T> coming) {
        this.subscribes.register(new Subscription<T>(coming, template));
    }

    public void unsubscribe(@NotNull String key) {
        this.subscribes.unregister((Subscription<?>)((Object)key));
    }

    private boolean canReceive(byte @NotNull [] topic) {
        return this.filterChannel.test(topic) && Arrays.equals(topic, Redis.ANY_TOPIC) || Arrays.equals(topic, this.topic);
    }

    public PubSub filterChannel(@NotNull Predicate<byte[]> filterChannel) {
        if (filterChannel == null) {
            throw new NullPointerException("filterChannel is marked non-null but is null");
        }
        this.filterChannel = filterChannel;
        return this;
    }

    private static final class Listener
    extends RedisPubSubAdapter<byte[], byte[]> {
        @NotNull
        private final PubSub pubSub;

        public void message(byte[] channel, byte[] message) {
            if (!this.pubSub.canReceive(channel)) {
                return;
            }
            try {
                Definition.ServerMessage serverMessage = Definition.ServerMessage.parseFrom((byte[])message);
                String type = serverMessage.getType();
                Optional<Subscription<?>> subscription = this.pubSub.subscribes.get(type);
                if (subscription.isEmpty()) {
                    System.out.printf("Subscription for %s not found%n", type);
                    return;
                }
                subscription.get().onMessage(serverMessage);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        private Listener(@NotNull PubSub pubSub) {
            if (pubSub == null) {
                throw new NullPointerException("pubSub is marked non-null but is null");
            }
            this.pubSub = pubSub;
        }
    }
}

