/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.adapter.coap;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tag;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.coap.OptionSet;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.TelemetryExecutionContext;
import org.eclipse.hono.adapter.coap.CoapContext;
import org.eclipse.hono.adapter.coap.CoapProtocolAdapter;
import org.eclipse.hono.adapter.coap.RequestDeviceAndAuth;
import org.eclipse.hono.adapter.coap.TracingSupportingHonoResource;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHonoResource
extends TracingSupportingHonoResource {
    private static final String KEY_TIMER_ID = "timerId";
    private static final String KEY_MICROMETER_SAMPLE = "micrometer.sample";
    private static final Logger LOG = LoggerFactory.getLogger(AbstractHonoResource.class);
    private final Vertx vertx;

    protected AbstractHonoResource(String resourceName, CoapProtocolAdapter adapter, Tracer tracer, Vertx vertx) {
        super(adapter, tracer, resourceName);
        this.vertx = Objects.requireNonNull(vertx);
    }

    protected Future<RequestDeviceAndAuth> getPostRequestDeviceAndAuth(CoapExchange exchange) {
        return Optional.ofNullable(TracingSupportingHonoResource.getAuthenticatedDevice(exchange)).map(authenticatedDevice -> new RequestDeviceAndAuth((DeviceUser)authenticatedDevice, TracingSupportingHonoResource.getAuthId(exchange), (DeviceUser)authenticatedDevice)).map(Future::succeededFuture).orElseGet(() -> Future.failedFuture((Throwable)new ClientErrorException(401, "DTLS session does not contain authenticated Device")));
    }

    protected Future<RequestDeviceAndAuth> getPutRequestDeviceAndAuth(CoapExchange exchange) {
        List pathList = exchange.getRequestOptions().getUriPath();
        if (pathList.isEmpty()) {
            return Future.failedFuture((Throwable)new ClientErrorException(404, "request URI must not be empty"));
        }
        String[] path = pathList.toArray(new String[pathList.size()]);
        ResourceIdentifier requestedResource = ResourceIdentifier.fromPath((String[])path);
        Promise result = Promise.promise();
        Optional.ofNullable(TracingSupportingHonoResource.getAuthenticatedDevice(exchange)).ifPresentOrElse(authenticatedDevice -> {
            String tenantId = Optional.ofNullable(requestedResource.getTenantId()).orElse(authenticatedDevice.getTenantId());
            if (Strings.isNullOrEmpty((Object)requestedResource.getResourceId())) {
                result.fail((Throwable)new ClientErrorException(404, "request URI must contain device ID"));
            } else if (authenticatedDevice.getTenantId().equals(tenantId)) {
                result.complete((Object)new RequestDeviceAndAuth(new DeviceUser(tenantId, requestedResource.getResourceId()), TracingSupportingHonoResource.getAuthId(exchange), (DeviceUser)authenticatedDevice));
            } else {
                result.fail((Throwable)new ClientErrorException(403, "tenant ID in request URI must match provided credentials"));
            }
        }, () -> {
            if (Strings.isNullOrEmpty((Object)requestedResource.getTenantId()) || Strings.isNullOrEmpty((Object)requestedResource.getResourceId())) {
                result.fail((Throwable)new ClientErrorException(404, "request URI must contain tenant and device ID"));
            } else {
                result.complete((Object)new RequestDeviceAndAuth(new DeviceUser(requestedResource.getTenantId(), requestedResource.getResourceId()), null, null));
            }
        });
        return result.future();
    }

    protected CoapContext newContext(CoapExchange exchange, RequestDeviceAndAuth deviceAndAuth, Span span) {
        return CoapContext.fromRequest(exchange, deviceAndAuth.getOriginDevice(), deviceAndAuth.getAuthenticatedDevice(), deviceAndAuth.getAuthId(), span, this.getAdapter().getMetrics().startTimer());
    }

    @Override
    protected Future<CoapContext> createCoapContextForPost(CoapExchange exchange, Span span) {
        return this.getPostRequestDeviceAndAuth(exchange).map(deviceAndAuth -> this.newContext(exchange, (RequestDeviceAndAuth)deviceAndAuth, span));
    }

    @Override
    protected Future<CoapContext> createCoapContextForPut(CoapExchange exchange, Span span) {
        return this.getPutRequestDeviceAndAuth(exchange).map(deviceAndAuth -> this.newContext(exchange, (RequestDeviceAndAuth)deviceAndAuth, span));
    }

    protected static final void addMicrometerSample(CommandContext ctx, Timer.Sample sample) {
        Objects.requireNonNull(ctx);
        ctx.put(KEY_MICROMETER_SAMPLE, (Object)sample);
    }

    protected static final Timer.Sample getMicrometerSample(CommandContext ctx) {
        Objects.requireNonNull(ctx);
        return (Timer.Sample)ctx.get(KEY_MICROMETER_SAMPLE);
    }

    protected void customizeDownstreamMessageProperties(Map<String, Object> messageProperties, CoapContext ctx) {
    }

    protected final Future<Void> doUploadMessage(CoapContext context, MetricsTags.EndpointType endpoint) {
        Objects.requireNonNull(context);
        Objects.requireNonNull(endpoint);
        String contentType = context.getContentType();
        Buffer payload = context.getPayload();
        if (!AbstractProtocolAdapterBase.isPayloadOfIndicatedType((Buffer)payload, (String)contentType)) {
            return Future.failedFuture((Throwable)new ClientErrorException(400, "content type [%s] does not match payload".formatted(contentType)));
        }
        String gatewayId = context.getGatewayId();
        String tenantId = context.getOriginDevice().getTenantId();
        String deviceId = context.getOriginDevice().getDeviceId();
        MetricsTags.QoS qos = context.isConfirmable() ? MetricsTags.QoS.AT_LEAST_ONCE : MetricsTags.QoS.AT_MOST_ONCE;
        Span currentSpan = TracingHelper.buildChildSpan((Tracer)this.getTracer(), (SpanContext)context.getTracingContext(), (String)("upload " + endpoint.getCanonicalName()), (String)this.getAdapter().getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag((Tag)TracingHelper.TAG_TENANT_ID, (Object)tenantId).withTag((Tag)TracingHelper.TAG_DEVICE_ID, (Object)deviceId).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), context.isDeviceAuthenticated()).withTag("QoS-Level", qos.asTag().getValue()).start();
        Promise responseReady = Promise.promise();
        Future tokenTracker = this.getAdapter().getRegistrationAssertion(tenantId, deviceId, (Device)context.getAuthenticatedDevice(), currentSpan.context());
        Future tenantTracker = this.getAdapter().getTenantClient().get(tenantId, currentSpan.context());
        Future tenantValidationTracker = tenantTracker.compose(tenantObject -> Future.all((Future)this.getAdapter().isAdapterEnabled((TenantObject)tenantObject), (Future)this.getAdapter().checkMessageLimit((TenantObject)tenantObject, payload.length(), currentSpan.context())).map(tenantObject));
        Future ttdTracker = Future.all((Future)tenantValidationTracker, (Future)tokenTracker).compose(ok -> {
            Integer ttdParam = context.getTimeUntilDisconnect();
            return this.getAdapter().getTimeUntilDisconnect((TenantObject)tenantTracker.result(), ttdParam).onSuccess(effectiveTtd -> Optional.ofNullable(effectiveTtd).ifPresent(v -> TracingHelper.TAG_DEVICE_TTD.set(currentSpan, v)));
        });
        Future commandConsumerTracker = ttdTracker.compose(ttd -> this.createCommandConsumer((Integer)ttd, (TenantObject)tenantTracker.result(), deviceId, gatewayId, context, (Handler<AsyncResult<Void>>)responseReady, currentSpan));
        return commandConsumerTracker.compose(commandConsumer -> {
            Map props = this.getAdapter().getDownstreamMessageProperties((TelemetryExecutionContext)context);
            Optional.ofNullable(commandConsumer).map(c -> (Integer)ttdTracker.result()).ifPresent(ttd -> props.put("ttd", ttd));
            this.customizeDownstreamMessageProperties(props, context);
            if (context.isConfirmable()) {
                context.startAcceptTimer(this.vertx, (TenantObject)tenantTracker.result(), this.getAdapter().getConfig().getTimeoutToAck());
            }
            Future sendResult = endpoint == MetricsTags.EndpointType.EVENT ? this.getAdapter().getEventSender((TenantObject)tenantValidationTracker.result()).sendEvent((TenantObject)tenantTracker.result(), (RegistrationAssertion)tokenTracker.result(), contentType, payload, props, currentSpan.context()) : this.getAdapter().getTelemetrySender((TenantObject)tenantValidationTracker.result()).sendTelemetry((TenantObject)tenantTracker.result(), (RegistrationAssertion)tokenTracker.result(), context.getRequestedQos(), contentType, payload, props, currentSpan.context());
            return Future.all((Future)sendResult, (Future)responseReady.future()).mapEmpty();
        }).compose(proceed -> Optional.ofNullable((ProtocolAdapterCommandConsumer)commandConsumerTracker.result()).map(consumer -> consumer.close(false, currentSpan.context()).otherwise(thr -> null)).orElseGet(Future::succeededFuture)).map(proceed -> {
            CommandContext commandContext = (CommandContext)context.get("command-context");
            Response response = new Response(CoAP.ResponseCode.CHANGED);
            if (commandContext != null) {
                this.addCommandToResponse(context, response, commandContext, currentSpan);
                commandContext.accept();
                this.getAdapter().getMetrics().reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantId, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, commandContext.getCommand().getPayloadSize(), AbstractHonoResource.getMicrometerSample(commandContext));
            }
            LOG.trace("successfully processed message for device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{tenantId, deviceId, endpoint.getCanonicalName()});
            this.getAdapter().getMetrics().reportTelemetry(endpoint, tenantId, (TenantObject)tenantTracker.result(), MetricsTags.ProcessingOutcome.FORWARDED, qos, payload.length(), this.getTtdStatus(context), context.getTimer());
            context.respond(response);
            currentSpan.finish();
            return null;
        }).recover(t -> {
            LOG.debug("cannot process message from device [tenantId: {}, deviceId: {}, endpoint: {}]", new Object[]{tenantId, deviceId, endpoint.getCanonicalName(), t});
            Future commandConsumerClosedTracker = Optional.ofNullable((ProtocolAdapterCommandConsumer)commandConsumerTracker.result()).map(consumer -> consumer.close(false, currentSpan.context()).otherwise(thr -> null)).orElseGet(Future::succeededFuture);
            CommandContext commandContext = (CommandContext)context.get("command-context");
            if (commandContext != null) {
                TracingHelper.logError((Span)commandContext.getTracingSpan(), (String)"command won't be forwarded to device in CoAP response, CoAP request handling failed", (Throwable)t);
                commandContext.release(t);
                currentSpan.log("released command for device");
            }
            this.getAdapter().getMetrics().reportTelemetry(endpoint, tenantId, (TenantObject)tenantTracker.result(), ClientErrorException.class.isInstance(t) ? MetricsTags.ProcessingOutcome.UNPROCESSABLE : MetricsTags.ProcessingOutcome.UNDELIVERABLE, qos, payload.length(), this.getTtdStatus(context), context.getTimer());
            TracingHelper.logError((Span)currentSpan, (Throwable)t);
            commandConsumerClosedTracker.onComplete(res -> currentSpan.finish());
            return Future.failedFuture((Throwable)t);
        });
    }

    protected void addCommandToResponse(CoapContext requestContext, Response response, CommandContext commandContext, Span currentSpan) {
        Command command = commandContext.getCommand();
        OptionSet options = response.getOptions();
        LOG.debug("adding command [name: {}, request-id: {}] to response for device [tenant-id: {}, device-id: {}]", new Object[]{command.getName(), command.getRequestId(), command.getTenant(), command.getGatewayOrDeviceId()});
        commandContext.getTracingSpan().log("forwarding command to device in CoAP response");
        options.addLocationQuery("hono-command=" + command.getName());
        currentSpan.setTag("hono-command", command.getName());
        boolean useShortEndpointName = requestContext.hasShortEndpointName();
        String endpointName = null;
        String tenantId = null;
        String deviceId = null;
        String requestId = null;
        if (command.isOneWay()) {
            String string = endpointName = useShortEndpointName ? "c" : "command";
            if (command.isTargetedAtGateway()) {
                tenantId = "";
                deviceId = command.getDeviceId();
            }
        } else {
            endpointName = useShortEndpointName ? "cr" : "command_response";
            requestId = command.getRequestId();
            if (requestContext.getAuthenticatedDevice() == null) {
                tenantId = command.getTenant();
                deviceId = command.getDeviceId();
            } else if (command.isTargetedAtGateway()) {
                tenantId = "";
                deviceId = command.getDeviceId();
            }
        }
        options.addLocationPath(endpointName);
        Optional.ofNullable(tenantId).ifPresent(arg_0 -> ((OptionSet)options).addLocationPath(arg_0));
        Optional.ofNullable(deviceId).ifPresent(id -> {
            options.addLocationPath(id);
            currentSpan.setTag("hono-cmd-target-device", id);
        });
        Optional.ofNullable(requestId).ifPresent(id -> {
            options.addLocationPath(id);
            currentSpan.setTag("hono-cmd-req-id", command.getRequestId());
        });
        int formatCode = MediaTypeRegistry.parse((String)command.getContentType());
        if (formatCode != -1) {
            options.setContentFormat(formatCode);
        } else {
            currentSpan.log("ignoring unknown content type [" + command.getContentType() + "] of command");
        }
        Optional.ofNullable(command.getPayload()).map(Buffer::getBytes).ifPresent(arg_0 -> ((Response)response).setPayload(arg_0));
    }

    protected final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(Integer ttdSecs, final TenantObject tenantObject, final String deviceId, String gatewayId, CoapContext context, Handler<AsyncResult<Void>> responseReady, Span uploadMessageSpan) {
        Objects.requireNonNull(tenantObject);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(context);
        Objects.requireNonNull(responseReady);
        Objects.requireNonNull(uploadMessageSpan);
        if (ttdSecs == null || ttdSecs <= 0) {
            responseReady.handle((Object)Future.succeededFuture());
            return Future.succeededFuture();
        }
        AtomicBoolean requestProcessed = new AtomicBoolean(false);
        TracingHelper.TAG_DEVICE_TTD.set(uploadMessageSpan, ttdSecs);
        Span waitForCommandSpan = TracingHelper.buildChildSpan((Tracer)this.getTracer(), (SpanContext)uploadMessageSpan.context(), (String)"create consumer and wait for command", (String)this.getAdapter().getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").start();
        TracingHelper.setDeviceTags((Span)waitForCommandSpan, (String)tenantObject.getTenantId(), (String)deviceId);
        Function<CommandContext, Future> commandHandler = commandContext -> {
            Span processCommandSpan = TracingHelper.buildFollowsFromSpan((Tracer)this.getTracer(), (SpanContext)waitForCommandSpan.context(), (String)"process received command").withTag(Tags.COMPONENT.getKey(), this.getAdapter().getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").addReference("follows_from", commandContext.getTracingContext()).start();
            TracingHelper.setDeviceTags((Span)processCommandSpan, (String)tenantObject.getTenantId(), (String)deviceId);
            Tags.COMPONENT.set(commandContext.getTracingSpan(), this.getAdapter().getTypeName());
            commandContext.logCommandToSpan(processCommandSpan);
            Command command = commandContext.getCommand();
            Timer.Sample commandSample = this.getAdapter().getMetrics().startTimer();
            if (this.isCommandValid(command, processCommandSpan)) {
                Promise commandHandlerDonePromise = Promise.promise();
                if (requestProcessed.compareAndSet(false, true)) {
                    waitForCommandSpan.finish();
                    this.getAdapter().checkMessageLimit(tenantObject, command.getPayloadSize(), processCommandSpan.context()).onComplete(result -> {
                        if (result.succeeded()) {
                            AbstractHonoResource.addMicrometerSample(commandContext, commandSample);
                            context.put("command-context", commandContext);
                            commandHandlerDonePromise.complete();
                        } else {
                            commandContext.reject(result.cause());
                            TracingHelper.logError((Span)processCommandSpan, (String)"rejected command for device", (Throwable)result.cause());
                            this.getAdapter().getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.from((Throwable)result.cause()), command.getPayloadSize(), commandSample);
                            commandHandlerDonePromise.fail(result.cause());
                        }
                        this.cancelCommandReceptionTimer(context);
                        this.setTtdStatus(context, MetricsTags.TtdStatus.COMMAND);
                        responseReady.handle((Object)Future.succeededFuture());
                        processCommandSpan.finish();
                    });
                } else {
                    String errorMsg = "waiting time for command has elapsed or another command has already been processed";
                    LOG.debug("{} [tenantId: {}, deviceId: {}]", new Object[]{"waiting time for command has elapsed or another command has already been processed", tenantObject.getTenantId(), deviceId});
                    this.getAdapter().getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.UNDELIVERABLE, command.getPayloadSize(), commandSample);
                    ServerErrorException exception = new ServerErrorException(503, "waiting time for command has elapsed or another command has already been processed");
                    commandContext.release((Throwable)exception);
                    TracingHelper.logError((Span)processCommandSpan, (String)"waiting time for command has elapsed or another command has already been processed");
                    processCommandSpan.finish();
                    commandHandlerDonePromise.fail((Throwable)exception);
                }
                return commandHandlerDonePromise.future();
            }
            this.getAdapter().getMetrics().reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, tenantObject.getTenantId(), tenantObject, MetricsTags.ProcessingOutcome.UNPROCESSABLE, command.getPayloadSize(), commandSample);
            LOG.debug("command message is invalid: {}", (Object)command);
            commandContext.reject("malformed command message");
            TracingHelper.logError((Span)processCommandSpan, (String)"malformed command message");
            processCommandSpan.finish();
            return Future.failedFuture((String)"malformed command message");
        };
        Future commandConsumerFuture = gatewayId != null ? this.getAdapter().getCommandConsumerFactory().createCommandConsumer(tenantObject.getTenantId(), deviceId, gatewayId, false, commandHandler, Duration.ofSeconds(ttdSecs.intValue()), waitForCommandSpan.context()) : this.getAdapter().getCommandConsumerFactory().createCommandConsumer(tenantObject.getTenantId(), deviceId, false, commandHandler, Duration.ofSeconds(ttdSecs.intValue()), waitForCommandSpan.context());
        return commandConsumerFuture.onFailure(thr -> {
            TracingHelper.logError((Span)waitForCommandSpan, (Throwable)thr);
            waitForCommandSpan.finish();
        }).map(consumer -> {
            if (!requestProcessed.get()) {
                this.addCommandReceptionTimer(context, requestProcessed, responseReady, ttdSecs.intValue(), waitForCommandSpan);
                context.startAcceptTimer(this.vertx, tenantObject, this.getAdapter().getConfig().getTimeoutToAck());
            }
            return new ProtocolAdapterCommandConsumer(){
                final /* synthetic */ ProtocolAdapterCommandConsumer val$consumer;
                {
                    this.val$consumer = protocolAdapterCommandConsumer;
                }

                public Future<Void> close(boolean sendEvent, SpanContext spanContext) {
                    Span closeConsumerSpan = TracingHelper.buildChildSpan((Tracer)AbstractHonoResource.this.getTracer(), (SpanContext)spanContext, (String)"close command consumer", (String)AbstractHonoResource.this.getAdapter().getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").start();
                    TracingHelper.setDeviceTags((Span)closeConsumerSpan, (String)tenantObject.getTenantId(), (String)deviceId);
                    return this.val$consumer.close(sendEvent, closeConsumerSpan.context()).onFailure(thr -> TracingHelper.logError((Span)closeConsumerSpan, (Throwable)thr)).onComplete(ar -> closeConsumerSpan.finish());
                }
            };
        });
    }

    protected boolean isCommandValid(Command command, Span currentSpan) {
        return command.isValid();
    }

    private void addCommandReceptionTimer(CoapContext context, AtomicBoolean requestProcessed, Handler<AsyncResult<Void>> responseReady, long delaySecs, Span waitForCommandSpan) {
        Long timerId = this.vertx.setTimer(delaySecs * 1000L, id -> {
            LOG.trace("time to wait [{}s] for command expired [timer id: {}]", (Object)delaySecs, id);
            if (requestProcessed.compareAndSet(false, true)) {
                this.setTtdStatus(context, MetricsTags.TtdStatus.EXPIRED);
                waitForCommandSpan.log(String.format("time to wait for command expired (%ds)", delaySecs));
                waitForCommandSpan.finish();
                responseReady.handle((Object)Future.succeededFuture());
            } else {
                LOG.trace("response already sent, nothing to do ...");
            }
        });
        LOG.trace("adding command reception timer [id: {}]", (Object)timerId);
        context.put(KEY_TIMER_ID, timerId);
    }

    private void cancelCommandReceptionTimer(CoapContext context) {
        Long timerId = (Long)context.get(KEY_TIMER_ID);
        if (timerId != null && timerId >= 0L) {
            if (this.vertx.cancelTimer(timerId.longValue())) {
                LOG.trace("Cancelled timer id {}", (Object)timerId);
            } else {
                LOG.debug("Could not cancel timer id {}", (Object)timerId);
            }
        }
    }

    private void setTtdStatus(CoapContext context, MetricsTags.TtdStatus status) {
        context.put(MetricsTags.TtdStatus.class.getName(), status);
    }

    private MetricsTags.TtdStatus getTtdStatus(CoapContext context) {
        return Optional.ofNullable((MetricsTags.TtdStatus)context.get(MetricsTags.TtdStatus.class.getName())).orElse(MetricsTags.TtdStatus.NONE);
    }
}

