package org.springframework.xd.dirt.server.admin.deployment.zk;

import java.util.EnumSet;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.core.RuntimeTimeoutException;
import org.springframework.xd.dirt.server.admin.deployment.DeploymentException;
import org.springframework.xd.dirt.server.admin.deployment.DeploymentMessage;
import org.springframework.xd.dirt.server.admin.deployment.DeploymentMessagePublisher;
import org.springframework.xd.dirt.zookeeper.Paths;

/* loaded from: input_file:org/springframework/xd/dirt/server/admin/deployment/zk/ZKDeploymentMessagePublisher.class */
public class ZKDeploymentMessagePublisher implements DeploymentMessagePublisher {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    public static final String SUCCESS = "success";
    public static final String ERROR = "error";
    private final DeploymentQueue deploymentQueue;

    @Value("${xd.admin.deploymentTimeout:30000}")
    private long deploymentTimeout;

    /* loaded from: input_file:org/springframework/xd/dirt/server/admin/deployment/zk/ZKDeploymentMessagePublisher$ResultWatcher.class */
    private class ResultWatcher implements CuratorWatcher {
        private State state;
        private String errorDesc;

        private ResultWatcher() {
            this.state = State.incomplete;
        }

        public State getState() {
            return this.state;
        }

        public String getErrorDesc() {
            return this.errorDesc;
        }

        public void process(WatchedEvent watchedEvent) throws Exception {
            ZKDeploymentMessagePublisher.this.logger.trace("Event: {}", watchedEvent);
            if (EnumSet.of(Watcher.Event.KeeperState.SyncConnected, Watcher.Event.KeeperState.SaslAuthenticated, Watcher.Event.KeeperState.ConnectedReadOnly).contains(watchedEvent.getState())) {
                if (watchedEvent.getType() != Watcher.Event.EventType.NodeChildrenChanged) {
                    ZKDeploymentMessagePublisher.this.logger.debug("Ignoring event: {}", watchedEvent);
                    return;
                }
                List list = (List) ZKDeploymentMessagePublisher.this.getClient().getChildren().forPath(watchedEvent.getPath());
                Assert.state(list.size() == 1);
                synchronized (ZKDeploymentMessagePublisher.this) {
                    if (list.contains(ZKDeploymentMessagePublisher.ERROR)) {
                        this.errorDesc = new String((byte[]) ZKDeploymentMessagePublisher.this.getClient().getData().forPath(Paths.build(watchedEvent.getPath(), ZKDeploymentMessagePublisher.ERROR)));
                        this.state = State.error;
                    } else {
                        this.state = State.success;
                    }
                    ZKDeploymentMessagePublisher.this.notifyAll();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/xd/dirt/server/admin/deployment/zk/ZKDeploymentMessagePublisher$State.class */
    public enum State {
        incomplete,
        success,
        error
    }

    public ZKDeploymentMessagePublisher(DeploymentQueue deploymentQueue) {
        this.deploymentQueue = deploymentQueue;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CuratorFramework getClient() {
        return this.deploymentQueue.getClient();
    }

    private long getTimeout() {
        return this.deploymentTimeout * 3;
    }

    @Override // org.springframework.xd.dirt.server.admin.deployment.DeploymentMessagePublisher
    public void publish(DeploymentMessage deploymentMessage) {
        try {
            this.deploymentQueue.getDistributedQueue().put(deploymentMessage);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // org.springframework.xd.dirt.server.admin.deployment.DeploymentMessagePublisher
    public void poll(DeploymentMessage deploymentMessage) {
        CuratorFramework client = getClient();
        String requestId = deploymentMessage.getRequestId();
        Assert.hasText(requestId, "requestId for message required");
        ResultWatcher resultWatcher = new ResultWatcher();
        String build = Paths.build(Paths.DEPLOYMENTS, Paths.RESPONSES, requestId);
        try {
            try {
                try {
                    this.logger.trace("result path: {}", build);
                    client.create().creatingParentsIfNeeded().forPath(build);
                    ((BackgroundPathable) client.getChildren().usingWatcher(resultWatcher)).forPath(build);
                    publish(deploymentMessage);
                    long timeout = getTimeout();
                    long currentTimeMillis = System.currentTimeMillis() + timeout;
                    synchronized (this) {
                        while (resultWatcher.getState() == State.incomplete && System.currentTimeMillis() < currentTimeMillis) {
                            wait(timeout);
                        }
                    }
                    switch (resultWatcher.getState()) {
                        case incomplete:
                            throw new RuntimeTimeoutException(String.format("Request %s timed out after %d ms", deploymentMessage, Long.valueOf(currentTimeMillis)));
                        case error:
                            throw new DeploymentException(resultWatcher.getErrorDesc());
                        default:
                            try {
                                return;
                            } catch (Exception e) {
                                return;
                            }
                    }
                } finally {
                    try {
                        client.delete().deletingChildrenIfNeeded().forPath(build);
                    } catch (Exception e2) {
                        this.logger.debug("Exception while removing result path " + build, e2);
                    }
                }
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
                try {
                    client.delete().deletingChildrenIfNeeded().forPath(build);
                } catch (Exception e4) {
                    this.logger.debug("Exception while removing result path " + build, e4);
                }
            }
        } catch (RuntimeException e5) {
            throw e5;
        } catch (Exception e6) {
            throw new RuntimeException(e6);
        }
    }
}
