package uk.co.gresearch.siembol.deployment.storm.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import uk.co.gresearch.siembol.common.model.StormTopologiesDto;
import uk.co.gresearch.siembol.common.model.StormTopologyDto;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector;
import uk.co.gresearch.siembol.deployment.storm.model.StormResponseTopologyDto;
import uk.co.gresearch.siembol.deployment.storm.model.TopologyManagerInfoDto;
import uk.co.gresearch.siembol.deployment.storm.model.TopologyStateDto;
import uk.co.gresearch.siembol.deployment.storm.providers.KubernetesProvider;
import uk.co.gresearch.siembol.deployment.storm.providers.StormProvider;

/* loaded from: input_file:BOOT-INF/classes/uk/co/gresearch/siembol/deployment/storm/service/TopologyManagerServiceImpl.class */
public class TopologyManagerServiceImpl implements TopologyManagerService {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final ObjectReader TOPOLOGY_READER = new ObjectMapper().readerFor(StormTopologiesDto.class);
    private final StormProvider stormProvider;
    private final ZooKeeperConnector zookeeperDesiredState;
    private final ZooKeeperConnector zookeeperSavedState;
    private final KubernetesProvider kubernetesProvider;
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    private final AtomicReference<Optional<Exception>> exception = new AtomicReference<>(Optional.empty());

    public TopologyManagerServiceImpl(StormProvider stormProvider, KubernetesProvider kubernetesProvider, ZooKeeperConnector zooKeeperConnector, ZooKeeperConnector zooKeeperConnector2, int i) {
        this.stormProvider = stormProvider;
        this.kubernetesProvider = kubernetesProvider;
        this.zookeeperDesiredState = zooKeeperConnector;
        this.zookeeperSavedState = zooKeeperConnector2;
        this.zookeeperDesiredState.addCacheListener(() -> {
            this.executorService.execute(this::synchronise);
        });
        if (i > 0) {
            this.executorService.scheduleAtFixedRate(() -> {
                this.executorService.execute(this::synchronise);
            }, i, i, TimeUnit.SECONDS);
        }
    }

    @Override // uk.co.gresearch.siembol.deployment.storm.service.TopologyManagerService
    public void invokeSynchronise() {
        this.executorService.execute(this::synchronise);
    }

    @Override // uk.co.gresearch.siembol.deployment.storm.service.TopologyManagerService
    public TopologyManagerInfoDto getTopologyManagerInfo() {
        TopologyManagerInfoDto topologyManagerInfoDto = new TopologyManagerInfoDto();
        try {
            Map<String, StormTopologyDto> zookeeperState = getZookeeperState(this.zookeeperDesiredState.getData());
            Map<String, StormTopologyDto> zookeeperState2 = getZookeeperState(this.zookeeperSavedState.getData());
            HashSet<String> hashSet = new HashSet();
            hashSet.addAll(zookeeperState.keySet());
            hashSet.addAll(zookeeperState2.keySet());
            HashMap hashMap = new HashMap();
            int i = 0;
            for (String str : hashSet) {
                TopologyStateDto topologyState = getTopologyState(str, zookeeperState, zookeeperState2);
                hashMap.put(str, topologyState);
                if (topologyState.equals(TopologyStateDto.SYNCHRONISED)) {
                    i++;
                }
            }
            topologyManagerInfoDto.setTopologies(hashMap);
            topologyManagerInfoDto.setNumberSynchronised(i);
            topologyManagerInfoDto.setNumberDifferent(hashMap.size() - i);
        } catch (JsonProcessingException e) {
            LOG.error("Exception during getting topology manager Info: ", (Throwable) e);
            this.exception.set(Optional.of(e));
        }
        return topologyManagerInfoDto;
    }

    @Override // uk.co.gresearch.siembol.deployment.storm.service.TopologyManagerService
    public Health checkHealth() {
        Optional<Exception> optional = this.exception.get();
        return optional.isPresent() ? Health.down(optional.get()).build() : Health.up().build();
    }

    private void synchronise() {
        try {
            String data = this.zookeeperDesiredState.getData();
            String data2 = this.zookeeperSavedState.getData();
            Map<String, StormTopologyDto> zookeeperState = getZookeeperState(data);
            Map<String, StormTopologyDto> zookeeperState2 = getZookeeperState(data2);
            Map<String, StormResponseTopologyDto> stormState = getStormState();
            LOG.debug("Desired State: {}, Saved State: {}, Storm State: {}", zookeeperState.keySet(), zookeeperState2.keySet(), stormState.keySet());
            if (!stormState.values().stream().filter(stormResponseTopologyDto -> {
                return shouldKillTopology(stormResponseTopologyDto, zookeeperState, zookeeperState2);
            }).map(stormResponseTopologyDto2 -> {
                return Boolean.valueOf(this.stormProvider.killTopology(stormResponseTopologyDto2.getId()));
            }).allMatch(bool -> {
                return bool.booleanValue();
            })) {
                throw new Exception("Failed to kill all storm topologies");
            }
            saveState(data);
            Stream<StormTopologyDto> filter = zookeeperState.values().stream().filter(stormTopologyDto -> {
                return shouldStartTopology(stormTopologyDto, zookeeperState2, stormState);
            });
            KubernetesProvider kubernetesProvider = this.kubernetesProvider;
            Objects.requireNonNull(kubernetesProvider);
            filter.forEach(kubernetesProvider::createOrReplaceJob);
        } catch (Exception e) {
            LOG.error("Exception in synchronise: ", (Throwable) e);
            this.exception.set(Optional.of(e));
            this.executorService.shutdown();
        }
    }

    private Map<String, StormTopologyDto> getZookeeperState(String str) throws JsonProcessingException {
        return (Map) ((StormTopologiesDto) TOPOLOGY_READER.readValue(str)).getTopologies().stream().collect(Collectors.toMap((v0) -> {
            return v0.getTopologyName();
        }, Function.identity()));
    }

    private Map<String, StormResponseTopologyDto> getStormState() throws IOException {
        return (Map) this.stormProvider.listTopologies().stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, Function.identity()));
    }

    private boolean shouldKillTopology(StormResponseTopologyDto stormResponseTopologyDto, Map<String, StormTopologyDto> map, Map<String, StormTopologyDto> map2) {
        String name = stormResponseTopologyDto.getName();
        Optional ofNullable = Optional.ofNullable(map.get(name));
        Optional ofNullable2 = Optional.ofNullable(map2.get(name));
        boolean z = !ofNullable.isPresent() && ofNullable2.isPresent();
        boolean z2 = ofNullable.isPresent() && ofNullable2.isPresent() && !Objects.equals(((StormTopologyDto) ofNullable.get()).getTopologyId(), ((StormTopologyDto) ofNullable2.get()).getTopologyId());
        boolean z3 = ofNullable.isPresent() && !ofNullable2.isPresent();
        if (z3) {
            LOG.warn("Existing topology running in storm with conflicting name: {}. Killing...", stormResponseTopologyDto.getName());
        }
        return z || z2 || z3;
    }

    private boolean shouldStartTopology(StormTopologyDto stormTopologyDto, Map<String, StormTopologyDto> map, Map<String, StormResponseTopologyDto> map2) {
        String topologyName = stormTopologyDto.getTopologyName();
        Optional ofNullable = Optional.ofNullable(map.get(topologyName));
        return (!ofNullable.isPresent()) || (ofNullable.isPresent() && !Objects.equals(stormTopologyDto.getTopologyId(), ((StormTopologyDto) ofNullable.get()).getTopologyId())) || (!Optional.ofNullable(map2.get(topologyName)).isPresent());
    }

    private void saveState(String str) throws Exception {
        this.zookeeperSavedState.setData(str);
    }

    private TopologyStateDto getTopologyState(String str, Map<String, StormTopologyDto> map, Map<String, StormTopologyDto> map2) {
        return map.containsKey(str) ? map2.containsKey(str) ? map.get(str).getTopologyId().equals(map2.get(str).getTopologyId()) ? TopologyStateDto.SYNCHRONISED : TopologyStateDto.DIFFERENT : TopologyStateDto.DESIRED_STATE_ONLY : TopologyStateDto.SAVED_STATE_ONLY;
    }
}
