package com.atlassian.bitbucket.internal.search.indexing.syncutil;

import com.atlassian.bitbucket.internal.search.indexing.syncutil.EntityWithLocation;
import com.atlassian.bitbucket.internal.search.indexing.util.ElasticsearchUtil;
import com.atlassian.bitbucket.internal.search.indexing.util.Observables;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/bitbucket-search-5.16.0.jar:com/atlassian/bitbucket/internal/search/indexing/syncutil/StreamingComparator.class */
public class StreamingComparator<E, I> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamingComparator.class);
    private final Function<E, I> entityToIdentifier;
    private final Observable<EntityWithLocation<E>> searchEntities;
    private final Observable<EntityWithLocation<E>> serverEntities;

    /* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/bitbucket-search-5.16.0.jar:com/atlassian/bitbucket/internal/search/indexing/syncutil/StreamingComparator$Builder.class */
    public static class Builder<E, I> {
        public Function<E, I> entityToIdentifier;
        private Observable<EntityWithLocation<E>> searchEntitySource;
        private Observable<EntityWithLocation<E>> serverEntitySource;

        public StreamingComparator<E, I> build() {
            return new StreamingComparator<>(this);
        }

        public Builder<E, I> entityToIdentifier(@Nonnull Function<E, I> function) {
            this.entityToIdentifier = (Function) Objects.requireNonNull(function, "entityToIdentifier");
            return this;
        }

        public Builder<E, I> searchSource(@Nonnull Observable<EntityWithLocation<E>> observable) {
            this.searchEntitySource = (Observable) Objects.requireNonNull(observable, "searchSourceBuilder");
            return this;
        }

        public Builder<E, I> serverSource(@Nonnull Observable<EntityWithLocation<E>> observable) {
            this.serverEntitySource = (Observable) Objects.requireNonNull(observable, "serverEntitySource");
            return this;
        }
    }

    private StreamingComparator(Builder<E, I> builder) {
        this.searchEntities = (Observable) Objects.requireNonNull(((Builder) builder).searchEntitySource, "builder.searchEntitySource");
        this.serverEntities = (Observable) Objects.requireNonNull(((Builder) builder).serverEntitySource, "builder.serverEntitySource");
        this.entityToIdentifier = (Function) Objects.requireNonNull(builder.entityToIdentifier, "builder.entityToIdentifier");
    }

    public static <E, I> Builder<E, I> builder() {
        return new Builder<>();
    }

    public boolean doComparison(ComparisonCallback<E> comparisonCallback) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        return ((Boolean) Observables.consumeSingle(Observable.merge(this.serverEntities.doOnCompleted(() -> {
            atomicBoolean.set(true);
        }), this.searchEntities.doOnCompleted(() -> {
            atomicBoolean2.set(true);
        })).doOnNext(entityWithLocation -> {
            Object apply = this.entityToIdentifier.apply(entityWithLocation.getEntity());
            EntityWithLocation entityWithLocation = (EntityWithLocation) concurrentHashMap.get(apply);
            if (entityWithLocation == null) {
                entityWithLocation = EntityWithLocation.of(entityWithLocation.getLocation(), entityWithLocation.getEntity());
            } else {
                entityWithLocation.foundIn(entityWithLocation.getLocation());
            }
            if (entityWithLocation.isIn(EntityWithLocation.Location.BOTH)) {
                concurrentHashMap.remove(apply);
                comparisonCallback.inBothSources(entityWithLocation.getEntity());
            } else if (atomicBoolean2.get()) {
                comparisonCallback.inServerSourceOnly(entityWithLocation.getEntity());
            } else if (atomicBoolean.get()) {
                comparisonCallback.inSearchSourceOnly(entityWithLocation.getEntity());
            } else {
                concurrentHashMap.put(apply, entityWithLocation);
            }
        }).count()).fold(exc -> {
            if (ElasticsearchUtil.isNetworkConnectException(exc)) {
                log.error("An error occurred while attempting to connect to Elasticsearch. The Elasticsearch instance/cluster is possibly unreachable");
            } else {
                log.error("Unable to perform job due to an unexpected error", (Throwable) exc);
            }
            return false;
        }, num -> {
            log.debug("Number of items compared: {}", num);
            processComparisonResult(concurrentHashMap, comparisonCallback);
            return true;
        })).booleanValue();
    }

    private void processComparisonResult(Map<I, EntityWithLocation<E>> map, ComparisonCallback<E> comparisonCallback) {
        map.values().forEach(entityWithLocation -> {
            if (entityWithLocation.isOnlyIn(EntityWithLocation.Location.DATABASE)) {
                comparisonCallback.inServerSourceOnly(entityWithLocation.getEntity());
            } else if (entityWithLocation.isOnlyIn(EntityWithLocation.Location.ELASTICSEARCH)) {
                comparisonCallback.inSearchSourceOnly(entityWithLocation.getEntity());
            }
        });
    }
}
