/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.proton.ProtonDelivery;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.amqp.AmqpMessageContext;
import org.eclipse.hono.application.client.amqp.ProtonBasedDownstreamMessage;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient;
import org.eclipse.hono.client.amqp.RequestResponseClient;
import org.eclipse.hono.client.impl.CachingClientFactory;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.AddressHelper;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RequestResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ProtonBasedRequestResponseCommandClient
extends AbstractRequestResponseServiceClient<DownstreamMessage<AmqpMessageContext>, RequestResponseResult<DownstreamMessage<AmqpMessageContext>>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProtonBasedRequestResponseCommandClient.class);
    private static final long DEFAULT_COMMAND_TIMEOUT_IN_MS = 10000L;
    private int messageCounter;

    protected ProtonBasedRequestResponseCommandClient(HonoConnection connection, SendMessageSampler.Factory samplerFactory) {
        super(connection, samplerFactory, new CachingClientFactory(connection.getVertx(), RequestResponseClient::isOpen), null);
    }

    protected String getKey(String tenantId) {
        return String.format("%s-%s", "command", tenantId);
    }

    public Future<DownstreamMessage<AmqpMessageContext>> sendCommand(String tenantId, String deviceId, String command, String contentType, Buffer data, String replyId, Map<String, Object> properties, Duration timeout, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(command);
        long timeoutInMs = Optional.ofNullable(timeout).map(t -> {
            if (t.isNegative()) {
                throw new IllegalArgumentException("command timeout duration must be >= 0");
            }
            return t.toMillis();
        }).orElse(10000L);
        Span currentSpan = this.newChildSpan(context, "send command and receive response");
        return this.getOrCreateClient(tenantId, replyId).map(client -> {
            client.setRequestTimeout(timeoutInMs);
            return client;
        }).compose(client -> {
            String messageTargetAddress = AddressHelper.getTargetAddress((String)"command", (String)tenantId, (String)deviceId, (ClientConfigProperties)this.connection.getConfig());
            return client.createAndSendRequest(command, messageTargetAddress, properties, data, contentType, this::mapCommandResponse, currentSpan);
        }).recover(error -> {
            Tags.HTTP_STATUS.set(currentSpan, Integer.valueOf(ServiceInvocationException.extractStatusCode((Throwable)error)));
            TracingHelper.logError((Span)currentSpan, (Throwable)error);
            return Future.failedFuture((Throwable)error);
        }).compose(result -> {
            if (result == null) {
                return Future.failedFuture((Throwable)new ClientErrorException(400));
            }
            DownstreamMessage commandResponseMessage = (DownstreamMessage)result.getPayload();
            this.setTagsForResult(currentSpan, (RequestResponseResult)result);
            if (result.isError()) {
                String detailMessage = commandResponseMessage.getPayload() != null && commandResponseMessage.getPayload().length() > 0 ? commandResponseMessage.getPayload().toString(StandardCharsets.UTF_8) : null;
                return Future.failedFuture((Throwable)StatusCodeMapper.from((int)result.getStatus(), detailMessage));
            }
            return Future.succeededFuture((Object)commandResponseMessage);
        }).onComplete(r -> currentSpan.finish());
    }

    private RequestResponseResult<DownstreamMessage<AmqpMessageContext>> mapCommandResponse(Message message, ProtonDelivery delivery) {
        ProtonBasedDownstreamMessage downStreamMessage = ProtonBasedDownstreamMessage.from(message, delivery);
        return Optional.ofNullable(MessageHelper.getStatus((Message)message)).map(status -> new RequestResponseResult(status.intValue(), (Object)downStreamMessage, CacheDirective.from((String)MessageHelper.getCacheDirective((Message)message)), null)).orElseGet(() -> {
            LOGGER.warn("response message has no status code application property [reply-to: {}, correlation ID: {}]", (Object)message.getReplyTo(), message.getCorrelationId());
            return null;
        });
    }

    protected RequestResponseResult<DownstreamMessage<AmqpMessageContext>> getResult(int status, String contentType, Buffer payload, CacheDirective cacheDirective, ApplicationProperties applicationProperties) {
        throw new UnsupportedOperationException();
    }

    private Future<RequestResponseClient<RequestResponseResult<DownstreamMessage<AmqpMessageContext>>>> getOrCreateClient(String tenantId, String replyId) {
        return this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> this.connection.executeOnContext(result -> this.clientFactory.getOrCreateClient(this.getKey(tenantId), () -> RequestResponseClient.forEndpoint((HonoConnection)this.connection, (String)"command", (String)"command_response", (String)tenantId, (String)Optional.ofNullable(replyId).orElse(UUID.randomUUID().toString()), this::createMessageId, (SendMessageSampler)this.samplerFactory.create("command"), arg_0 -> ((ProtonBasedRequestResponseCommandClient)this).removeClient(arg_0), arg_0 -> ((ProtonBasedRequestResponseCommandClient)this).removeClient(arg_0)), (Handler)result)));
    }

    private String createMessageId() {
        return Long.toString(++this.messageCounter, 36);
    }
}

