/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.eventhandling.processing.streaming.token.store.jdbc;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.DateTimeUtils;
import org.axonframework.common.annotation.Internal;
import org.axonframework.common.jdbc.ConnectionProvider;
import org.axonframework.common.jdbc.JdbcException;
import org.axonframework.common.jdbc.JdbcUtils;
import org.axonframework.conversion.Converter;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.Segment;
import org.axonframework.messaging.eventhandling.processing.streaming.token.TrackingToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.ConfigToken;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.UnableToClaimTokenException;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.UnableToInitializeTokenException;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.UnableToRetrieveIdentifierException;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.jdbc.JdbcTokenEntry;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.jdbc.JdbcTokenStoreConfiguration;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.jdbc.TokenSchema;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.jdbc.TokenTableFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcTokenStore
implements TokenStore {
    private static final Logger logger = LoggerFactory.getLogger(JdbcTokenStore.class);
    private static final String CONFIG_TOKEN_ID = "__config";
    private static final Segment CONFIG_SEGMENT = new Segment(0, 0);
    private static final String COUNT_COLUMN_NAME = "segmentCount";
    private final ConnectionProvider connectionProvider;
    private final Converter converter;
    private final TokenSchema schema;
    private final TemporalAmount claimTimeout;
    private final String nodeId;

    public JdbcTokenStore(@Nonnull ConnectionProvider connectionProvider, @Nonnull Converter converter, @Nonnull JdbcTokenStoreConfiguration configuration) {
        BuilderUtils.assertNonNull((Object)connectionProvider, (String)"The ConnectionProvider is a hard requirement and should be provided");
        BuilderUtils.assertNonNull((Object)converter, (String)"The Converter is a hard requirement and should be provided");
        BuilderUtils.assertNonNull((Object)configuration, (String)"The JdbcTokenStoreConfiguration should be provided");
        this.connectionProvider = connectionProvider;
        this.converter = converter;
        this.schema = configuration.schema();
        this.claimTimeout = configuration.claimTimeout();
        this.nodeId = configuration.nodeId();
    }

    public void createSchema(TokenTableFactory schemaFactory) {
        Connection c = this.getConnection();
        try {
            JdbcUtils.executeUpdates((Connection)c, e -> {
                throw new JdbcException("Failed to create token tables", (Throwable)e);
            }, (JdbcUtils.SqlFunction[])new JdbcUtils.SqlFunction[]{connection -> schemaFactory.createTable(connection, this.schema)});
        }
        finally {
            JdbcUtils.closeQuietly((Connection)c);
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken, @Nullable ProcessingContext context) {
        CompletableFuture<List<Segment>> completableFuture;
        block8: {
            Connection connection = this.getConnection();
            try {
                completableFuture = CompletableFuture.completedFuture((List)JdbcUtils.executeQuery((Connection)connection, c -> this.selectForUpdate(c, processorName, 0), resultSet -> {
                    List<Segment> segments = Segment.splitBalanced(Segment.ROOT_SEGMENT, segmentCount - 1);
                    for (Segment segment : segments) {
                        this.insertTokenEntry(connection, initialToken, processorName, segment);
                    }
                    return segments;
                }, e -> new UnableToClaimTokenException("Could not initialize segments. Some segments were already present.", (Throwable)e)));
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) {
        CompletableFuture<Object> completableFuture;
        block8: {
            Connection connection = this.getConnection();
            try {
                JdbcUtils.executeQuery((Connection)connection, c -> this.selectForUpdate(c, processorName, 0), resultSet -> {
                    this.insertTokenEntry(connection, token, processorName, segment);
                    return null;
                }, e -> new UnableToInitializeTokenException("Could not initialize segments. Some segments were already present.", (Throwable)e));
                completableFuture = CompletableFuture.completedFuture(null);
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<String> retrieveStorageIdentifier(@Nullable ProcessingContext context) {
        try {
            return CompletableFuture.completedFuture(this.loadConfigurationToken().get("id"));
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConfigToken loadConfigurationToken() throws UnableToRetrieveIdentifierException {
        Connection connection = this.getConnection();
        try {
            TrackingToken token = (TrackingToken)JdbcUtils.executeQuery((Connection)connection, c -> this.select(connection, CONFIG_TOKEN_ID, CONFIG_SEGMENT.getSegmentId(), false), resultSet -> resultSet.next() ? this.readTokenEntry(resultSet).getToken(this.converter) : null, e -> new UnableToRetrieveIdentifierException("Exception while attempting to retrieve the config token", (Throwable)e), (boolean)false);
            try {
                if (token == null) {
                    token = this.insertTokenEntry(connection, new ConfigToken(Collections.singletonMap("id", UUID.randomUUID().toString())), CONFIG_TOKEN_ID, CONFIG_SEGMENT);
                }
            }
            catch (SQLException e2) {
                throw new UnableToRetrieveIdentifierException("Exception while attempting to initialize the config token. It may have been concurrently initialized.", e2);
            }
            ConfigToken configToken = (ConfigToken)token;
            return configToken;
        }
        finally {
            JdbcUtils.closeQuietly((Connection)connection);
        }
    }

    @Internal
    public Converter converter() {
        return this.converter;
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> storeToken(@Nullable TrackingToken token, @Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        CompletableFuture<Object> completableFuture;
        block9: {
            Connection connection = this.getConnection();
            try {
                int updatedToken = JdbcUtils.executeUpdate((Connection)connection, c -> this.storeUpdate(connection, token, processorName, segment), e -> new JdbcException(String.format("Could not store token [%s] for processor [%s] and segment [%d]", token, processorName, segment), (Throwable)e));
                if (updatedToken == 0) {
                    logger.debug("Could not update token [{}] for processor [{}] and segment [{}]. Trying load-then-save approach instead.", new Object[]{token, processorName, segment});
                    JdbcUtils.executeQuery((Connection)connection, c -> this.selectForUpdate(c, processorName, segment), resultSet -> {
                        this.updateToken(connection, resultSet, token, processorName, segment);
                        return null;
                    }, e -> new JdbcException(String.format("Could not store token [%s] for processor [%s] and segment [%d]", token, processorName, segment), (Throwable)e));
                }
                completableFuture = CompletableFuture.completedFuture(null);
                if (connection == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) throws UnableToClaimTokenException {
        CompletableFuture<TrackingToken> completableFuture;
        block8: {
            Connection connection = this.getConnection();
            try {
                completableFuture = CompletableFuture.completedFuture((TrackingToken)JdbcUtils.executeQuery((Connection)connection, c -> this.selectForUpdate(c, processorName, segment), resultSet -> this.loadToken(connection, resultSet, processorName, segment), e -> new JdbcException(String.format("Could not load token for processor [%s] and segment [%d]", processorName, segment), (Throwable)e)));
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) throws UnableToClaimTokenException {
        CompletableFuture<TrackingToken> completableFuture;
        block8: {
            Connection connection = this.getConnection();
            try {
                completableFuture = CompletableFuture.completedFuture((TrackingToken)JdbcUtils.executeQuery((Connection)connection, c -> this.selectForUpdate(c, processorName, segment.getSegmentId()), resultSet -> this.loadToken(connection, resultSet, processorName, segment), e -> new JdbcException(String.format("Could not load token for processor [%s] and segment [%d]", processorName, segment.getSegmentId()), (Throwable)e)));
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> releaseClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        CompletableFuture<Object> completableFuture;
        block8: {
            Connection connection = this.getConnection();
            try {
                JdbcUtils.executeUpdates((Connection)connection, e -> {
                    throw new JdbcException(String.format("Could not load token for processor [%s] and segment [%d]", processorName, segment), (Throwable)e);
                }, (JdbcUtils.SqlFunction[])new JdbcUtils.SqlFunction[]{c -> this.releaseClaim(c, processorName, segment)});
                completableFuture = CompletableFuture.completedFuture(null);
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> deleteToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        CompletableFuture<Object> completableFuture;
        block9: {
            Connection connection = this.getConnection();
            try {
                int[] result = JdbcUtils.executeUpdates((Connection)connection, e -> {
                    throw new JdbcException(String.format("Could not remove token for processor [%s] and segment [%d]", processorName, segment), (Throwable)e);
                }, (JdbcUtils.SqlFunction[])new JdbcUtils.SqlFunction[]{c -> this.deleteToken(c, processorName, segment)});
                if (result[0] < 1) {
                    throw new UnableToClaimTokenException("Unable to claim token. It wasn't owned by " + this.nodeId);
                }
                completableFuture = CompletableFuture.completedFuture(null);
                if (connection == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<Segment> fetchSegment(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) {
        CompletableFuture<Segment> completableFuture;
        block8: {
            Connection connection = this.getConnection();
            try {
                completableFuture = CompletableFuture.completedFuture((Segment)JdbcUtils.executeQuery((Connection)connection, c -> this.select(c, processorName, segmentId, false), resultSet -> resultSet.next() ? this.readTokenEntry(resultSet).getSegment() : null, e -> new JdbcException("Could not load segments for processor [%s]".formatted(processorName), (Throwable)e)));
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> fetchSegments(@Nonnull String processorName, @Nullable ProcessingContext context) {
        CompletableFuture<List<Segment>> completableFuture;
        block8: {
            Connection connection = this.getConnection();
            try {
                completableFuture = CompletableFuture.completedFuture((List)JdbcUtils.executeQuery((Connection)connection, c -> this.selectForSegments(c, processorName), (JdbcUtils.SqlResultConverter)JdbcUtils.listResults(rs -> new Segment(rs.getInt(this.schema.segmentColumn()), rs.getInt(this.schema.maskColumn()))), e -> new JdbcException("Could not load segments for processor [%s]".formatted(processorName), (Throwable)e)));
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> fetchAvailableSegments(@Nonnull String processorName, @Nullable ProcessingContext context) {
        CompletableFuture<List<Segment>> completableFuture;
        block8: {
            Connection connection = this.getConnection();
            try {
                List tokenEntries = (List)JdbcUtils.executeQuery((Connection)connection, c -> this.selectTokenEntries(c, processorName), (JdbcUtils.SqlResultConverter)JdbcUtils.listResults(this::readTokenEntry), e -> new JdbcException(String.format("Could not load segments for processor [%s]", processorName), (Throwable)e));
                completableFuture = CompletableFuture.completedFuture(tokenEntries.stream().filter(tokenEntry -> tokenEntry.mayClaim(this.nodeId, this.claimTimeout)).map(JdbcTokenEntry::getSegment).collect(Collectors.toList()));
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e2) {
                    return CompletableFuture.failedFuture(e2);
                }
            }
            connection.close();
        }
        return completableFuture;
    }

    protected PreparedStatement selectForSegments(Connection connection, String processorName) throws SQLException {
        String sql = "SELECT " + this.schema.segmentColumn() + "," + this.schema.maskColumn() + " FROM " + this.schema.tokenTable() + " WHERE " + this.schema.processorNameColumn() + " = ? ORDER BY " + this.schema.segmentColumn() + " ASC";
        PreparedStatement preparedStatement = connection.prepareStatement(sql, 1003, 1007);
        preparedStatement.setString(1, processorName);
        return preparedStatement;
    }

    protected PreparedStatement selectTokenEntries(Connection connection, String processorName) throws SQLException {
        String sql = "SELECT * FROM " + this.schema.tokenTable() + " WHERE " + this.schema.processorNameColumn() + " = ? ORDER BY " + this.schema.segmentColumn() + " ASC";
        PreparedStatement preparedStatement = connection.prepareStatement(sql, 1003, 1007);
        preparedStatement.setString(1, processorName);
        return preparedStatement;
    }

    protected PreparedStatement storeUpdate(Connection connection, TrackingToken token, String processorName, int segment) throws SQLException {
        JdbcTokenEntry tokenToStore = new JdbcTokenEntry(token, this.converter);
        String sql = "UPDATE " + this.schema.tokenTable() + " SET " + this.schema.tokenColumn() + " = ?, " + this.schema.tokenTypeColumn() + " = ?, " + this.schema.timestampColumn() + " = ? WHERE " + this.schema.ownerColumn() + " = ? AND " + this.schema.processorNameColumn() + " = ? AND " + this.schema.segmentColumn() + " = ? ";
        PreparedStatement preparedStatement = connection.prepareStatement(sql, 1003, 1007);
        preparedStatement.setBytes(1, tokenToStore.getTokenData());
        preparedStatement.setString(2, tokenToStore.getTokenType());
        preparedStatement.setString(3, tokenToStore.timestampAsString());
        preparedStatement.setString(4, this.nodeId);
        preparedStatement.setString(5, processorName);
        preparedStatement.setInt(6, segment);
        return preparedStatement;
    }

    protected PreparedStatement selectForUpdate(Connection connection, String processorName, int segmentId) throws SQLException {
        return this.select(connection, processorName, segmentId, true);
    }

    protected PreparedStatement select(Connection connection, String processorName, int segmentId, boolean forUpdate) throws SQLException {
        String sql = "SELECT " + String.join((CharSequence)", ", this.schema.processorNameColumn(), this.schema.segmentColumn(), this.schema.maskColumn(), this.schema.tokenColumn(), this.schema.tokenTypeColumn(), this.schema.timestampColumn(), this.schema.ownerColumn()) + " FROM " + this.schema.tokenTable() + " WHERE " + this.schema.processorNameColumn() + " = ? AND " + this.schema.segmentColumn() + " = ? " + (forUpdate ? "FOR UPDATE" : "");
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1, processorName);
        preparedStatement.setInt(2, segmentId);
        return preparedStatement;
    }

    protected void updateToken(Connection connection, ResultSet resultSet, TrackingToken token, String processorName, int segment) throws SQLException {
        block9: {
            if (resultSet.next()) {
                JdbcTokenEntry entry = this.readTokenEntry(resultSet);
                entry.updateToken(token, this.converter);
                if (!entry.claim(this.nodeId, this.claimTimeout)) {
                    throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It is owned by '%s'", entry.getProcessorName(), entry.getSegment(), entry.getOwner()));
                }
                String sql = "UPDATE " + this.schema.tokenTable() + " SET " + this.schema.ownerColumn() + " = ?, " + this.schema.tokenColumn() + " = ?, " + this.schema.tokenTypeColumn() + " = ?, " + this.schema.timestampColumn() + " = ? WHERE " + this.schema.processorNameColumn() + " = ? AND " + this.schema.segmentColumn() + " = ?";
                try (PreparedStatement preparedStatement = connection.prepareStatement(sql);){
                    preparedStatement.setString(1, entry.getOwner());
                    preparedStatement.setObject(2, entry.getTokenData());
                    preparedStatement.setString(3, entry.getTokenType());
                    preparedStatement.setString(4, entry.timestampAsString());
                    preparedStatement.setString(5, processorName);
                    preparedStatement.setInt(6, segment);
                    if (preparedStatement.executeUpdate() != 1) {
                        throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has been removed", processorName, segment));
                    }
                    break block9;
                }
            }
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has not been initialized yet", processorName, segment));
        }
    }

    protected TrackingToken claimToken(Connection connection, JdbcTokenEntry entry) throws SQLException {
        if (!entry.claim(this.nodeId, this.claimTimeout)) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It is owned by '%s'", entry.getProcessorName(), entry.getSegment(), entry.getOwner()));
        }
        String sql = "UPDATE " + this.schema.tokenTable() + " SET " + this.schema.ownerColumn() + " = ?, " + this.schema.timestampColumn() + " = ? WHERE " + this.schema.processorNameColumn() + " = ? AND " + this.schema.segmentColumn() + " = ?";
        try (PreparedStatement preparedStatement = connection.prepareStatement(sql);){
            preparedStatement.setString(1, entry.getOwner());
            preparedStatement.setString(2, entry.timestampAsString());
            preparedStatement.setString(3, entry.getProcessorName());
            preparedStatement.setInt(4, entry.getSegment().getSegmentId());
            if (preparedStatement.executeUpdate() != 1) {
                throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has been removed", entry.getProcessorName(), entry.getSegment()));
            }
        }
        return entry.getToken(this.converter);
    }

    protected TrackingToken loadToken(Connection connection, ResultSet resultSet, String processorName, int segment) throws SQLException {
        if (!resultSet.next()) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has not been initialized yet", processorName, segment));
        }
        return this.claimToken(connection, this.readTokenEntry(resultSet));
    }

    protected TrackingToken loadToken(Connection connection, ResultSet resultSet, String processorName, Segment segment) throws SQLException {
        if (!resultSet.next()) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has not been initialized yet", processorName, segment.getSegmentId()));
        }
        JdbcTokenEntry tokenEntry = this.readTokenEntry(resultSet);
        this.validateSegment(processorName, segment);
        return this.claimToken(connection, tokenEntry);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void validateSegment(String processorName, Segment segment) {
        Connection connection = this.getConnection();
        try {
            int splitSegmentId = segment.splitSegmentId();
            int mergeableSegmentId = segment.mergeableSegmentId();
            JdbcUtils.executeQuery((Connection)connection, c -> this.selectSegments(c, processorName, splitSegmentId, mergeableSegmentId), r -> this.containsOneElement(r, processorName, segment.getSegmentId()), e -> new JdbcException(String.format("Could not load segments for processor [%s]", processorName), (Throwable)e));
        }
        finally {
            JdbcUtils.closeQuietly((Connection)connection);
        }
    }

    protected PreparedStatement selectSegments(Connection connection, String processorName, int splitSegmentId, int mergeableSegmentId) throws SQLException {
        String sql = "SELECT count(*) as segmentCount FROM " + this.schema.tokenTable() + " WHERE " + this.schema.processorNameColumn() + " = ? AND (" + this.schema.segmentColumn() + " = ? OR " + this.schema.segmentColumn() + " = ?)";
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1, processorName);
        preparedStatement.setInt(2, splitSegmentId);
        preparedStatement.setInt(3, mergeableSegmentId);
        return preparedStatement;
    }

    private boolean containsOneElement(ResultSet resultSet, String processorName, int segmentId) throws SQLException {
        resultSet.next();
        int size = resultSet.getInt(COUNT_COLUMN_NAME);
        if (size == 0) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has been merged with another segment", processorName, segmentId));
        }
        if (size >= 2) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has been split into two segments", processorName, segmentId));
        }
        return true;
    }

    protected TrackingToken insertTokenEntry(Connection connection, @Nonnull TrackingToken token, String processorName, @Nonnull Segment segment) throws SQLException {
        JdbcTokenEntry entry = new JdbcTokenEntry(token, this.converter);
        String sql = "INSERT INTO " + this.schema.tokenTable() + " (" + this.schema.processorNameColumn() + "," + this.schema.segmentColumn() + "," + this.schema.maskColumn() + "," + this.schema.timestampColumn() + "," + this.schema.tokenColumn() + "," + this.schema.tokenTypeColumn() + "," + this.schema.ownerColumn() + ") VALUES (?,?,?,?,?,?,?)";
        try (PreparedStatement preparedStatement = connection.prepareStatement(sql);){
            preparedStatement.setString(1, processorName);
            preparedStatement.setInt(2, segment.getSegmentId());
            preparedStatement.setInt(3, segment.getMask());
            preparedStatement.setString(4, entry.timestampAsString());
            preparedStatement.setBytes(5, entry.getTokenData());
            preparedStatement.setString(6, entry.getTokenType());
            preparedStatement.setString(7, entry.getOwner());
            preparedStatement.executeUpdate();
        }
        return token;
    }

    protected JdbcTokenEntry readTokenEntry(ResultSet resultSet) throws SQLException {
        return new JdbcTokenEntry(resultSet.getBytes(this.schema.tokenColumn()), resultSet.getString(this.schema.tokenTypeColumn()), resultSet.getString(this.schema.timestampColumn()), resultSet.getString(this.schema.ownerColumn()), resultSet.getString(this.schema.processorNameColumn()), new Segment(resultSet.getInt(this.schema.segmentColumn()), resultSet.getInt(this.schema.maskColumn())));
    }

    protected PreparedStatement releaseClaim(Connection connection, String processorName, int segment) throws SQLException {
        String sql = "UPDATE " + this.schema.tokenTable() + " SET " + this.schema.ownerColumn() + " = ?, " + this.schema.timestampColumn() + " = ? WHERE " + this.schema.processorNameColumn() + " = ? AND " + this.schema.segmentColumn() + " = ? AND " + this.schema.ownerColumn() + " = ?";
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1, null);
        preparedStatement.setString(2, DateTimeUtils.formatInstant((TemporalAccessor)JdbcTokenEntry.clock.instant()));
        preparedStatement.setString(3, processorName);
        preparedStatement.setInt(4, segment);
        preparedStatement.setString(5, this.nodeId);
        return preparedStatement;
    }

    protected PreparedStatement deleteToken(Connection connection, String processorName, int segment) throws SQLException {
        String sql = "DELETE FROM " + this.schema.tokenTable() + " WHERE " + this.schema.processorNameColumn() + " = ? AND " + this.schema.segmentColumn() + " = ? AND " + this.schema.ownerColumn() + " = ?";
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1, processorName);
        preparedStatement.setInt(2, segment);
        preparedStatement.setString(3, this.nodeId);
        return preparedStatement;
    }

    protected Connection getConnection() {
        try {
            return this.connectionProvider.getConnection();
        }
        catch (SQLException e) {
            throw new JdbcException("Failed to obtain a database connection", (Throwable)e);
        }
    }
}

