/*
 * Decompiled with CFR 0.152.
 */
package fish.payara.cloud.connectors.kafka.outbound;

import fish.payara.cloud.connectors.kafka.api.KafkaConnection;
import fish.payara.cloud.connectors.kafka.outbound.KafkaConnectionImpl;
import fish.payara.cloud.connectors.kafka.outbound.KafkaConnectionMetadata;
import java.io.PrintWriter;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.LocalTransaction;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionMetaData;
import javax.security.auth.Subject;
import javax.transaction.xa.XAResource;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;

public class KafkaManagedConnection
implements ManagedConnection,
KafkaConnection {
    private KafkaProducer producer;
    private final List<ConnectionEventListener> listeners = new LinkedList<ConnectionEventListener>();
    private final HashSet<KafkaConnectionImpl> connectionHandles = new HashSet();
    private PrintWriter writer;

    KafkaManagedConnection(KafkaProducer producer) {
        this.producer = producer;
    }

    public Object getConnection(Subject subject, ConnectionRequestInfo cxRequestInfo) throws ResourceException {
        KafkaConnectionImpl conn = new KafkaConnectionImpl(this);
        this.connectionHandles.add(conn);
        return conn;
    }

    public void destroy() throws ResourceException {
    }

    public void cleanup() throws ResourceException {
        for (KafkaConnectionImpl conn : this.connectionHandles) {
            conn.setRealConn(null);
        }
        this.connectionHandles.clear();
    }

    public void associateConnection(Object connection) throws ResourceException {
        if (connection instanceof KafkaConnectionImpl) {
            KafkaConnectionImpl conn = (KafkaConnectionImpl)connection;
            conn.setRealConn(this);
            this.connectionHandles.add(conn);
        }
    }

    public void addConnectionEventListener(ConnectionEventListener listener) {
        this.listeners.add(listener);
    }

    public void removeConnectionEventListener(ConnectionEventListener listener) {
        this.listeners.remove(listener);
    }

    public XAResource getXAResource() throws ResourceException {
        throw new NotSupportedException("XA is not supported");
    }

    public LocalTransaction getLocalTransaction() throws ResourceException {
        throw new NotSupportedException("Local Transaction Not Supported");
    }

    public ManagedConnectionMetaData getMetaData() throws ResourceException {
        return new KafkaConnectionMetadata();
    }

    public void setLogWriter(PrintWriter out) throws ResourceException {
        this.writer = out;
    }

    public PrintWriter getLogWriter() throws ResourceException {
        return this.writer;
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord record) {
        return this.producer.send(record);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
        return this.producer.send(record, callback);
    }

    @Override
    public void flush() {
        this.producer.flush();
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this.producer.partitionsFor(topic);
    }

    @Override
    public void close() throws Exception {
        this.producer.close();
        this.producer = null;
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() throws ResourceException {
        return this.producer.metrics();
    }

    void remove(KafkaConnectionImpl conn) {
        this.connectionHandles.remove(conn);
        ConnectionEvent event = new ConnectionEvent((ManagedConnection)this, 1);
        event.setConnectionHandle((Object)conn);
        for (ConnectionEventListener listener : this.listeners) {
            listener.connectionClosed(event);
        }
    }
}

