/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink.internal;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.sink.internal.BackchannelImpl;
import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;

@Internal
@ThreadSafe
public class BackchannelFactory {
    private static final BackchannelFactory INSTANCE = new BackchannelFactory();
    private final Map<Tuple3<Integer, Integer, String>, BackchannelImpl<?>> backchannels = new ConcurrentHashMap();

    public static BackchannelFactory getInstance() {
        return INSTANCE;
    }

    private BackchannelFactory() {
    }

    public <T> ReadableBackchannel<T> getReadableBackchannel(int subtaskId, int attemptNumber, String transactionalIdPrefix) {
        return this.getBackchannel(subtaskId, attemptNumber, transactionalIdPrefix, BackchannelImpl::createReadableBackchannel);
    }

    public <T> WritableBackchannel<T> getWritableBackchannel(int subtaskId, int attemptNumber, String transactionalIdPrefix) {
        return this.getBackchannel(subtaskId, attemptNumber, transactionalIdPrefix, BackchannelImpl::createWritableBackchannel);
    }

    private <R> R getBackchannel(int subtaskId, int attemptNumber, String transactionalIdPrefix, Function<BackchannelImpl<?>, R> subchannelCreator) {
        Tuple3 id = new Tuple3((Object)subtaskId, (Object)attemptNumber, (Object)transactionalIdPrefix);
        BackchannelImpl backchannel = this.backchannels.computeIfAbsent((Tuple3<Integer, Integer, String>)id, k -> new BackchannelImpl(() -> this.unregister((Tuple3<Integer, Integer, String>)id)));
        try {
            return subchannelCreator.apply(backchannel);
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Found duplicate transactionalIdPrefix for multiple Kafka sinks: %s. Transactional id prefixes need to be unique. You may experience memory leaks without fixing this.", transactionalIdPrefix), e);
        }
    }

    private void unregister(Tuple3<Integer, Integer, String> id) {
        this.backchannels.remove(id);
    }
}

