package io.github.kattlo.cloudevents;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.kafka.CloudEventSerializer;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.Generated;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/kattlo/cloudevents/KafkaAvroCloudEventSerializer.class */
public class KafkaAvroCloudEventSerializer extends KafkaAvroSerializer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaAvroCloudEventSerializer.class);
    public static final String DATASCHEMA_HEADER = "ce_dataschema";
    private final CloudEventSerializer ce;
    private String schemaRegistryUrl;

    /* loaded from: input_file:io/github/kattlo/cloudevents/KafkaAvroCloudEventSerializer$NoSchema.class */
    private static final class NoSchema implements ParsedSchema {
        private final String namespace;
        private final String name;

        public NoSchema(String str, String str2) {
            this.namespace = (String) Objects.requireNonNull(str);
            this.name = (String) Objects.requireNonNull(str2);
        }

        public String canonicalString() {
            throw new UnsupportedOperationException();
        }

        public boolean isBackwardCompatible(ParsedSchema parsedSchema) {
            throw new UnsupportedOperationException();
        }

        public String name() {
            return this.namespace + "." + this.name;
        }

        public Object rawSchema() {
            throw new UnsupportedOperationException();
        }

        public List<SchemaReference> references() {
            throw new UnsupportedOperationException();
        }

        public String schemaType() {
            throw new UnsupportedOperationException();
        }
    }

    public KafkaAvroCloudEventSerializer() {
        this.ce = new CloudEventSerializer();
    }

    public KafkaAvroCloudEventSerializer(SchemaRegistryClient schemaRegistryClient) {
        super(schemaRegistryClient);
        this.ce = new CloudEventSerializer();
    }

    private Encoding encodingOf(Map<String, ?> map) {
        log.debug("serializer configurations {}", map);
        Object obj = map.get("cloudevents.serializer.encoding");
        Encoding encoding = null;
        if (obj instanceof String) {
            encoding = Encoding.valueOf((String) obj);
        } else if (obj instanceof Encoding) {
            encoding = (Encoding) obj;
        } else if (obj != null) {
            throw new IllegalArgumentException("cloudevents.serializer.encoding can be of type String or " + Encoding.class.getCanonicalName());
        }
        return encoding;
    }

    public void configure(Map<String, ?> map, boolean z) {
        Encoding encodingOf = encodingOf(map);
        if (encodingOf != Encoding.BINARY) {
            throw new IllegalArgumentException("cloudevents.serializer.encoding=" + encodingOf + " not supported");
        }
        super.configure(map, z);
        this.ce.configure(map, z);
        this.schemaRegistryUrl = (String) map.get("schema.registry.url");
        log.debug("{}={}", "schema.registry.url", this.schemaRegistryUrl);
    }

    public byte[] serialize(String str, Headers headers, Object obj) {
        if (!(obj instanceof CloudEvent)) {
            throw new IllegalArgumentException("event argument must be an instance of " + CloudEvent.class);
        }
        CloudEvent cloudEvent = (CloudEvent) obj;
        this.ce.serialize(str, headers, cloudEvent);
        log.debug("CloudEvent headers {}", headers);
        if (!(cloudEvent.getData() instanceof AvroCloudEventData)) {
            throw new IllegalArgumentException("CloudEvent data attribute must be an instance of " + AvroCloudEventData.class.getName());
        }
        AvroCloudEventData avroCloudEventData = (AvroCloudEventData) cloudEvent.getData();
        IndexedRecord value = avroCloudEventData.getValue();
        Class<?> cls = value.getClass();
        log.debug("value to serialize as avro {}", value);
        byte[] serialize = super.serialize(str, headers, avroCloudEventData.getValue());
        SubjectNameStrategy subjectNameStrategy = (SubjectNameStrategy) ((KafkaAvroSerializer) this).valueSubjectNameStrategy;
        log.debug("SubjectNameStrategy {}", subjectNameStrategy);
        String subjectName = subjectNameStrategy.subjectName(str, Boolean.FALSE.booleanValue(), new NoSchema(cls.getPackageName(), cls.getSimpleName()));
        log.info("SubjectName {}", subjectName);
        try {
            List allVersions = ((KafkaAvroSerializer) this).schemaRegistry.getAllVersions(subjectName);
            Integer num = (Integer) allVersions.get(allVersions.size() - 1);
            log.debug("Schema versionId {}", num);
            String str2 = this.schemaRegistryUrl + "/subjects/" + subjectName + "/versions/" + num + "/schema";
            log.debug("{}={}", DATASCHEMA_HEADER, str2);
            headers.remove(DATASCHEMA_HEADER);
            headers.add(DATASCHEMA_HEADER, str2.getBytes());
            return serialize;
        } catch (IOException | RestClientException e) {
            throw new SerializationException(e.getMessage(), e);
        }
    }
}
