/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProducerBatch;
import org.apache.kafka.clients.producer.internals.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.SenderMetricsRegistry;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final String CLIENT_ID = "clientId";
    private static final double EPS = 1.0E-4;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private TopicPartition tp0 = new TopicPartition("test", 0);
    private TopicPartition tp1 = new TopicPartition("test", 1);
    private MockTime time = new MockTime();
    private int batchSize = 16384;
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
    private MockClient client = new MockClient((Time)this.time, this.metadata);
    private ApiVersions apiVersions = new ApiVersions();
    private Metrics metrics = null;
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private SenderMetricsRegistry senderMetricsRegistry = null;
    private final LogContext logContext = new LogContext();

    @Before
    public void setup() {
        this.setupWithTransactionState(null);
    }

    @After
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        long offset = 0L;
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertEquals((String)"We should have a single produce request in flight.", (long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
        this.sender.runOnce();
        Assert.assertEquals((String)"All requests completed.", (long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.runOnce();
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testMessageFormatDownConversion() throws Exception {
        long offset = 0L;
        this.apiVersions.update("0", NodeApiVersions.create());
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create(Collections.singleton(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, 0, 2))));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ProduceRequest request = (ProduceRequest)body;
                if (request.version() != 2) {
                    return false;
                }
                MemoryRecords records = (MemoryRecords)request.partitionRecordsOrFail().get(SenderTest.this.tp0);
                return records != null && records.sizeInBytes() > 0 && records.hasMatchingMagic((byte)1);
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testDownConversionForMismatchedMagicValues() throws Exception {
        long offset = 0L;
        this.apiVersions.update("0", NodeApiVersions.create());
        FutureRecordMetadata future1 = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create(Collections.singleton(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, 0, 2))));
        FutureRecordMetadata future2 = this.accumulator.append((TopicPartition)this.tp1, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create());
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, -1L, 100L);
        HashMap<TopicPartition, ProduceResponse.PartitionResponse> partResp = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        partResp.put(this.tp0, resp);
        partResp.put(this.tp1, resp);
        ProduceResponse produceResponse = new ProduceResponse(partResp, 0);
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ProduceRequest request = (ProduceRequest)body;
                if (request.version() != 2) {
                    return false;
                }
                Map recordsMap = request.partitionRecordsOrFail();
                if (recordsMap.size() != 2) {
                    return false;
                }
                for (MemoryRecords records : recordsMap.values()) {
                    if (records != null && records.sizeInBytes() != 0 && records.hasMatchingMagic((byte)1)) continue;
                    return false;
                }
                return true;
            }
        }, (AbstractResponse)produceResponse);
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((String)"Request should be completed", (boolean)future1.isDone());
        Assert.assertTrue((String)"Request should be completed", (boolean)future2.isDone());
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        MockSelector selector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Sender.throttleTimeSensor((SenderMetricsRegistry)this.senderMetricsRegistry);
        Cluster cluster = TestUtils.singletonCluster("test", 1);
        Node node = (Node)cluster.nodes().get(0);
        NetworkClient client = new NetworkClient((Selectable)selector, this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, ClientDnsLookup.DEFAULT, (Time)this.time, true, new ApiVersions(), throttleTimeSensor, this.logContext);
        short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
        ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse((int)400, (byte)2).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(1L, this.time.milliseconds());
            this.time.sleep(client.throttleDelayMs(node, this.time.milliseconds()));
        }
        selector.clear();
        for (int i = 1; i <= 3; ++i) {
            int throttleTimeMs = 100 * i;
            ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short)1, (int)1000, Collections.emptyMap());
            ClientRequest request = client.newClientRequest(node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true);
            client.send(request, this.time.milliseconds());
            client.poll(1L, this.time.milliseconds());
            ProduceResponse response = this.produceResponse(this.tp0, i, Errors.NONE, throttleTimeMs);
            buffer = response.serialize(ApiKeys.PRODUCE.latestVersion(), new ResponseHeader(request.correlationId()));
            selector.completeReceive(new NetworkReceive(node.idString(), buffer));
            client.poll(1L, this.time.milliseconds());
            this.time.sleep(client.throttleDelayMs(node, this.time.milliseconds()));
            selector.clear();
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeAvg);
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeMax);
        Assert.assertEquals((double)250.0, (double)((Double)avgMetric.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)400.0, (double)((Double)maxMetric.metricValue()), (double)1.0E-4);
        client.close();
    }

    @Test
    public void testSenderMetricsTemplates() throws Exception {
        this.metrics.close();
        Map<String, String> clientTags = Collections.singletonMap("client-id", "clientA");
        this.metrics = new Metrics(new MetricConfig().tags(clientTags));
        SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(this.metrics);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, 1, metricsRegistry, (Time)this.time, 1000, 50L, null, this.apiVersions);
        this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, 1000L);
        sender.runOnce();
        sender.runOnce();
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.runOnce();
        Sender.throttleTimeSensor((SenderMetricsRegistry)metricsRegistry);
        HashSet<MetricNameTemplate> allMetrics = new HashSet<MetricNameTemplate>();
        for (MetricName n : this.metrics.metrics().keySet()) {
            if (n.group().equals("kafka-metrics-count")) continue;
            allMetrics.add(new MetricNameTemplate(n.name(), n.group(), "", n.tags().keySet()));
        }
        TestUtils.checkEquals(allMetrics, new HashSet(metricsRegistry.allTemplates()), "metrics", "templates");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetries() throws Exception {
        int maxRetries = 1;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        try {
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 1000, 50L, null, this.apiVersions);
            FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
            sender.runOnce();
            sender.runOnce();
            String id = this.client.requests().peek().destination();
            Node node = new Node(Integer.parseInt(id), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((boolean)this.client.hasInFlightRequests());
            Assert.assertEquals((long)1L, (long)sender.inFlightBatches(this.tp0).size());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, this.time.milliseconds()));
            this.client.disconnect(id);
            Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
            Assert.assertFalse((boolean)this.client.hasInFlightRequests());
            Assert.assertFalse((String)"Client ready status should be false", (boolean)this.client.isReady(node, this.time.milliseconds()));
            Assert.assertEquals((long)1L, (long)sender.inFlightBatches(this.tp0).size());
            sender.runOnce();
            sender.runOnce();
            sender.runOnce();
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((boolean)this.client.hasInFlightRequests());
            Assert.assertEquals((long)1L, (long)sender.inFlightBatches(this.tp0).size());
            long offset = 0L;
            this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
            sender.runOnce();
            Assert.assertTrue((String)"Request should have retried and completed", (boolean)future.isDone());
            Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
            Assert.assertEquals((long)0L, (long)sender.inFlightBatches(this.tp0).size());
            future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
            sender.runOnce();
            Assert.assertEquals((long)1L, (long)sender.inFlightBatches(this.tp0).size());
            for (int i = 0; i < maxRetries + 1; ++i) {
                this.client.disconnect(this.client.requests().peek().destination());
                sender.runOnce();
                Assert.assertEquals((long)0L, (long)sender.inFlightBatches(this.tp0).size());
                sender.runOnce();
                sender.runOnce();
                Assert.assertEquals((long)(i > 0 ? 0L : 1L), (long)sender.inFlightBatches(this.tp0).size());
            }
            sender.runOnce();
            this.assertFutureFailure((Future<?>)future, (Class<? extends Exception>)NetworkException.class);
            Assert.assertEquals((long)0L, (long)sender.inFlightBatches(this.tp0).size());
        }
        finally {
            m.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSendInOrder() throws Exception {
        int maxRetries = 1;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        try {
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 1000, 50L, null, this.apiVersions);
            MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2));
            this.client.prepareMetadataUpdate(metadataUpdate1);
            TopicPartition tp2 = new TopicPartition("test", 1);
            this.accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, 1000L);
            sender.runOnce();
            sender.runOnce();
            String id = this.client.requests().peek().destination();
            Assert.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.parseInt(id), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((boolean)this.client.hasInFlightRequests());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, this.time.milliseconds()));
            Assert.assertEquals((long)1L, (long)sender.inFlightBatches(tp2).size());
            this.time.sleep(900L);
            this.accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, 1000L);
            MetadataResponse metadataUpdate2 = TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2));
            this.client.prepareMetadataUpdate(metadataUpdate2);
            Assert.assertEquals((long)1L, (long)sender.inFlightBatches(tp2).size());
            sender.runOnce();
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((boolean)this.client.hasInFlightRequests());
            Assert.assertEquals((long)1L, (long)sender.inFlightBatches(tp2).size());
        }
        finally {
            m.close();
        }
    }

    @Test
    public void testAppendInExpiryCallback() throws InterruptedException {
        int messagesPerBatch = 10;
        final AtomicInteger expiryCallbackCount = new AtomicInteger(0);
        final AtomicReference unexpectedException = new AtomicReference();
        final byte[] key = "key".getBytes();
        final byte[] value = "value".getBytes();
        long maxBlockTimeMs = 1000L;
        Callback callback = new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception instanceof TimeoutException) {
                    expiryCallbackCount.incrementAndGet();
                    try {
                        SenderTest.this.accumulator.append(SenderTest.this.tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 1000L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Unexpected interruption", e);
                    }
                } else if (exception != null) {
                    unexpectedException.compareAndSet(null, exception);
                }
            }
        };
        for (int i = 0; i < messagesPerBatch; ++i) {
            this.accumulator.append(this.tp1, 0L, key, value, null, callback, 1000L);
        }
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        Map drainedBatches = this.accumulator.drain(this.metadata.fetch(), Collections.singleton(clusterNode), Integer.MAX_VALUE, this.time.milliseconds());
        this.sender.addToInflightBatches(drainedBatches);
        this.client.disconnect(clusterNode.idString());
        this.client.blackout(clusterNode, 100L);
        this.sender.runOnce();
        Assert.assertEquals((String)"Callbacks not invoked for expiry", (long)messagesPerBatch, (long)expiryCallbackCount.get());
        Assert.assertNull((String)"Unexpected exception", unexpectedException.get());
        Assert.assertTrue((boolean)this.accumulator.batches().containsKey(this.tp1));
        Assert.assertEquals((long)1L, (long)((Deque)this.accumulator.batches().get(this.tp1)).size());
        Assert.assertEquals((long)messagesPerBatch, (long)((ProducerBatch)((Deque)this.accumulator.batches().get((Object)this.tp1)).peekFirst()).recordCount);
    }

    @Test
    public void testMetadataTopicExpiry() throws Exception {
        long offset = 0L;
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertTrue((String)"Topic not added to metadata", (boolean)this.metadata.containsTopic(this.tp0.topic()));
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.sender.runOnce();
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset++, Errors.NONE, 0));
        this.sender.runOnce();
        Assert.assertEquals((String)"Request completed.", (long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertTrue((String)"Topic not retained in metadata list", (boolean)this.metadata.containsTopic(this.tp0.topic()));
        this.time.sleep(300000L);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
        Assert.assertFalse((String)"Unused topic has not been expired", (boolean)this.metadata.containsTopic(this.tp0.topic()));
        future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertTrue((String)"Topic not added to metadata", (boolean)this.metadata.containsTopic(this.tp0.topic()));
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.sender.runOnce();
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset++, Errors.NONE, 0));
        this.sender.runOnce();
        Assert.assertEquals((String)"Request completed.", (long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
    }

    @Test
    public void testInitProducerIdRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)343434L, (long)transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals((long)0L, (long)transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.CLUSTER_AUTHORIZATION_FAILED);
        Assert.assertFalse((boolean)transactionManager.hasProducerId());
        Assert.assertTrue((boolean)transactionManager.hasError());
        Assert.assertTrue((boolean)(transactionManager.lastError() instanceof ClusterAuthorizationException));
        this.assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testCanRetryWithoutIdempotence() throws Exception {
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        this.sender.runOnce();
        String id = this.client.requests().peek().destination();
        Node node = new Node(Integer.parseInt(id), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, this.time.milliseconds()));
        Assert.assertFalse((boolean)future.isDone());
        this.client.respond(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ProduceRequest request = (ProduceRequest)body;
                Assert.assertFalse((boolean)request.hasIdempotentRecords());
                return true;
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TopicAuthorizationException));
        }
    }

    @Test
    public void testIdempotenceWithMultipleInflights() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request1.isDone());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
        Assert.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
    }

    @Test
    public void testIdempotenceWithMultipleInflightsRetriedInOrder() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        FutureRecordMetadata request3 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)3L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request1.isDone());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertFalse((boolean)request3.isDone());
        Assert.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.LEADER_NOT_AVAILABLE, -1L);
        this.sender.runOnce();
        FutureRecordMetadata request4 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(2, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assert.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sendIdempotentProducerResponse(2, this.tp0, Errors.NONE, 2L);
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue((boolean)request3.isDone());
        Assert.assertEquals((long)2L, (long)((RecordMetadata)request3.get()).offset());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sendIdempotentProducerResponse(3, this.tp0, Errors.NONE, 3L);
        this.sender.runOnce();
        Assert.assertEquals((long)3L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue((boolean)request4.isDone());
        Assert.assertEquals((long)3L, (long)((RecordMetadata)request4.get()).offset());
    }

    @Test
    public void testIdempotenceWithMultipleInflightsWhereFirstFailsFatallyAndSequenceOfFutureBatchesIsAdjusted() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request1.isDone());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.MESSAGE_TOO_LARGE, -1L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)RecordTooLargeException.class);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)0L, (long)((RecordMetadata)request2.get()).offset());
    }

    @Test
    public void testMustNotRetryOutOfOrderSequenceForNextBatch() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 1000L);
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        this.sendIdempotentProducerResponse(2, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request2, (Class<? extends Exception>)OutOfOrderSequenceException.class);
    }

    @Test
    public void testCorrectHandlingOfOutOfOrderResponses() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request1.isDone());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        ClientRequest firstClientRequest = this.client.requests().peek();
        ClientRequest secondClientRequest = (ClientRequest)this.client.requests().toArray()[1];
        this.client.respondToRequest(secondClientRequest, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1));
        this.sender.runOnce();
        Deque queuedBatches = (Deque)this.accumulator.batches().get(this.tp0);
        Assert.assertEquals((long)1L, (long)queuedBatches.size());
        Assert.assertEquals((long)1L, (long)((ProducerBatch)queuedBatches.peekFirst()).baseSequence());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(firstClientRequest, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.NOT_LEADER_FOR_PARTITION, -1));
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)queuedBatches.size());
        Assert.assertEquals((long)0L, (long)((ProducerBatch)queuedBatches.peekFirst()).baseSequence());
        Assert.assertEquals((long)1L, (long)((ProducerBatch)queuedBatches.peekLast()).baseSequence());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)request1.isDone());
        Assert.assertFalse((boolean)request2.isDone());
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
    }

    @Test
    public void testCorrectHandlingOfOutOfOrderResponsesWhenSecondSucceeds() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)request1.isDone());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        ClientRequest firstClientRequest = this.client.requests().peek();
        ClientRequest secondClientRequest = (ClientRequest)this.client.requests().toArray()[1];
        this.client.respondToRequest(secondClientRequest, (AbstractResponse)this.produceResponse(this.tp0, 1L, Errors.NONE, 1));
        this.sender.runOnce();
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
        Assert.assertFalse((boolean)request1.isDone());
        Deque queuedBatches = (Deque)this.accumulator.batches().get(this.tp0);
        Assert.assertEquals((long)0L, (long)queuedBatches.size());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(firstClientRequest, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.REQUEST_TIMED_OUT, -1));
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)queuedBatches.size());
        Assert.assertEquals((long)0L, (long)((ProducerBatch)queuedBatches.peekFirst()).baseSequence());
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)queuedBatches.size());
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
    }

    @Test
    public void testExpiryOfUnsentBatchesShouldNotCauseUnresolvedSequences() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(10000L);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assert.assertFalse((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager, false, null);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        this.time.sleep(1000L);
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)2L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.REQUEST_TIMED_OUT, -1L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(600L);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assert.assertTrue((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        FutureRecordMetadata request3 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.time.sleep(20L);
        Assert.assertFalse((boolean)request2.isDone());
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        Deque batches = (Deque)this.accumulator.batches().get(this.tp0);
        Assert.assertEquals((long)1L, (long)batches.size());
        Assert.assertFalse((boolean)((ProducerBatch)batches.peekFirst()).hasSequence());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertTrue((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        this.sender.runOnce();
        Assert.assertFalse((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)batches.size());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((boolean)request3.isDone());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        this.time.sleep(1000L);
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_FOR_PARTITION, -1L);
        this.sender.runOnce();
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(1000L);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assert.assertTrue((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        FutureRecordMetadata request3 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.time.sleep(20L);
        Assert.assertFalse((boolean)request2.isDone());
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request2, (Class<? extends Exception>)OutOfOrderSequenceException.class);
        Deque batches = (Deque)this.accumulator.batches().get(this.tp0);
        Assert.assertEquals((long)1L, (long)batches.size());
        Assert.assertFalse((boolean)((ProducerBatch)batches.peekFirst()).hasSequence());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertFalse((boolean)transactionManager.hasProducerId());
        Assert.assertFalse((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)0L, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_FOR_PARTITION, -1L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(15000L);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assert.assertTrue((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Deque batches = (Deque)this.accumulator.batches().get(this.tp0);
        Assert.assertEquals((long)0L, (long)batches.size());
        Assert.assertTrue((boolean)transactionManager.hasProducerId(343434L));
        this.prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId(343435L));
    }

    @Test
    public void testResetOfProducerStateShouldAllowQueuedBatchesToDrain() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata failedResponse = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        FutureRecordMetadata successfulResponse = this.accumulator.append((TopicPartition)this.tp1, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        sender.runOnce();
        sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        LinkedHashMap<TopicPartition, OffsetAndError> responses = new LinkedHashMap<TopicPartition, OffsetAndError>();
        responses.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_FOR_PARTITION));
        responses.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond((AbstractResponse)this.produceResponse(responses));
        sender.runOnce();
        Assert.assertTrue((boolean)failedResponse.isDone());
        Assert.assertFalse((String)"Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", (boolean)transactionManager.hasProducerId());
        this.prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        Assert.assertEquals((long)343435L, (long)transactionManager.producerIdAndEpoch().producerId);
        sender.runOnce();
        Assert.assertFalse((boolean)successfulResponse.isDone());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp1, 10L, Errors.NONE, -1));
        sender.runOnce();
        Assert.assertTrue((boolean)successfulResponse.isDone());
        Assert.assertEquals((long)10L, (long)((RecordMetadata)successfulResponse.get()).offset());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp1).longValue());
    }

    @Test
    public void testCloseWithProducerIdReset() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        final Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, 10, senderMetrics, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata failedResponse = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        FutureRecordMetadata successfulResponse = this.accumulator.append((TopicPartition)this.tp1, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        LinkedHashMap<TopicPartition, OffsetAndError> responses = new LinkedHashMap<TopicPartition, OffsetAndError>();
        responses.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_FOR_PARTITION));
        responses.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond((AbstractResponse)this.produceResponse(responses));
        sender.initiateClose();
        sender.runOnce();
        Assert.assertTrue((boolean)failedResponse.isDone());
        Assert.assertFalse((String)"Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", (boolean)transactionManager.hasProducerId());
        TestUtils.waitForCondition(new TestCondition(){

            @Override
            public boolean conditionMet() {
                SenderTest.this.prepareInitProducerResponse(Errors.NONE, 343435L, (short)1);
                sender.runOnce();
                return !SenderTest.this.accumulator.hasUndrained();
            }
        }, 5000L, "Failed to drain batches");
    }

    @Test
    public void testForceCloseWithProducerIdReset() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(1L, 0));
        this.setupWithTransactionState(transactionManager);
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, 10, senderMetrics, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata failedResponse = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        FutureRecordMetadata successfulResponse = this.accumulator.append((TopicPartition)this.tp1, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        LinkedHashMap<TopicPartition, OffsetAndError> responses = new LinkedHashMap<TopicPartition, OffsetAndError>();
        responses.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_FOR_PARTITION));
        responses.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond((AbstractResponse)this.produceResponse(responses));
        sender.runOnce();
        Assert.assertTrue((boolean)failedResponse.isDone());
        Assert.assertFalse((String)"Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", (boolean)transactionManager.hasProducerId());
        sender.forceClose();
        sender.runOnce();
        sender.run();
        Assert.assertTrue((String)"Pending batches are not aborted.", (!this.accumulator.hasUndrained() ? 1 : 0) != 0);
        Assert.assertTrue((boolean)successfulResponse.isDone());
    }

    @Test
    public void testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceOnSubsequentRetry() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata failedResponse = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        FutureRecordMetadata successfulResponse = this.accumulator.append((TopicPartition)this.tp1, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        sender.runOnce();
        sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        LinkedHashMap<TopicPartition, OffsetAndError> responses = new LinkedHashMap<TopicPartition, OffsetAndError>();
        responses.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_FOR_PARTITION));
        responses.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond((AbstractResponse)this.produceResponse(responses));
        sender.runOnce();
        Assert.assertTrue((boolean)failedResponse.isDone());
        Assert.assertFalse((String)"Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", (boolean)transactionManager.hasProducerId());
        this.prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        Assert.assertEquals((long)343435L, (long)transactionManager.producerIdAndEpoch().producerId);
        sender.runOnce();
        Assert.assertFalse((boolean)successfulResponse.isDone());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp1, 0L, Errors.NOT_LEADER_FOR_PARTITION, -1));
        sender.runOnce();
        Assert.assertTrue((boolean)successfulResponse.isDone());
        try {
            successfulResponse.get();
            Assert.fail((String)"Should have raised an OutOfOrderSequenceException");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof OutOfOrderSequenceException));
        }
    }

    @Test
    public void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request1.isDone());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        ClientRequest firstClientRequest = this.client.requests().peek();
        ClientRequest secondClientRequest = (ClientRequest)this.client.requests().toArray()[1];
        this.client.respondToRequest(secondClientRequest, (AbstractResponse)this.produceResponse(this.tp0, 1000L, Errors.NONE, 0));
        this.sender.runOnce();
        Assert.assertEquals((long)1000L, (long)transactionManager.lastAckedOffset(this.tp0));
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(firstClientRequest, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.DUPLICATE_SEQUENCE_NUMBER, 0));
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)1000L, (long)transactionManager.lastAckedOffset(this.tp0));
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        RecordMetadata unknownMetadata = (RecordMetadata)request1.get();
        Assert.assertFalse((boolean)unknownMetadata.hasOffset());
        Assert.assertEquals((long)-1L, (long)unknownMetadata.offset());
    }

    @Test
    public void testUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)1000L, (long)transactionManager.lastAckedOffset(this.tp0));
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 1000L);
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertEquals((long)1012L, (long)((RecordMetadata)request2.get()).offset());
        Assert.assertEquals((long)1012L, (long)transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testUnknownProducerErrorShouldBeRetriedWhenLogStartOffsetIsUnknown() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)1000L, (long)transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, -1L);
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertEquals((long)1011L, (long)((RecordMetadata)request2.get()).offset());
        Assert.assertEquals((long)1011L, (long)transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testUnknownProducerErrorShouldBeRetriedForFutureBatchesWhenFirstFails() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)1000L, (long)transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request3 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertFalse((boolean)request3.isDone());
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse((boolean)request2.isDone());
        Assert.assertFalse((boolean)request3.isDone());
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(2, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertFalse((boolean)request3.isDone());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)1011L, (long)((RecordMetadata)request2.get()).offset());
        Assert.assertEquals((long)1011L, (long)transactionManager.lastAckedOffset(this.tp0));
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1012L, 1010L);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertTrue((boolean)request3.isDone());
        Assert.assertEquals((long)1012L, (long)((RecordMetadata)request3.get()).offset());
        Assert.assertEquals((long)1012L, (long)transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testShouldRaiseOutOfOrderSequenceExceptionToUserIfLogWasNotTruncated() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)-1L, (long)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)1000L, (long)transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request2, (Class<? extends Exception>)OutOfOrderSequenceException.class);
    }

    void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) {
        this.sendIdempotentProducerResponse(expectedSequence, tp, responseError, responseOffset, -1L);
    }

    void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset, long logStartOffset) {
        this.client.respond(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ProduceRequest produceRequest = (ProduceRequest)body;
                Assert.assertTrue((boolean)produceRequest.hasIdempotentRecords());
                MemoryRecords records = (MemoryRecords)produceRequest.partitionRecordsOrFail().get(SenderTest.this.tp0);
                Iterator batchIterator = records.batches().iterator();
                RecordBatch firstBatch = (RecordBatch)batchIterator.next();
                Assert.assertFalse((boolean)batchIterator.hasNext());
                Assert.assertEquals((long)expectedSequence, (long)firstBatch.baseSequence());
                return true;
            }
        }, (AbstractResponse)this.produceResponse(tp, responseOffset, responseError, 0, logStartOffset));
    }

    @Test
    public void testClusterAuthorizationExceptionInProduceRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                return body instanceof ProduceRequest && ((ProduceRequest)body).hasIdempotentRecords();
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)future, (Class<? extends Exception>)ClusterAuthorizationException.class);
        Assert.assertTrue((boolean)transactionManager.hasFatalError());
        this.assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testCancelInFlightRequestAfterFatalError() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        FutureRecordMetadata future2 = this.accumulator.append((TopicPartition)this.tp1, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        this.client.respond(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                return body instanceof ProduceRequest && ((ProduceRequest)body).hasIdempotentRecords();
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)transactionManager.hasFatalError());
        this.assertFutureFailure((Future<?>)future1, (Class<? extends Exception>)ClusterAuthorizationException.class);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)future2, (Class<? extends Exception>)ClusterAuthorizationException.class);
        this.client.respond(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                return body instanceof ProduceRequest && ((ProduceRequest)body).hasIdempotentRecords();
            }
        }, (AbstractResponse)this.produceResponse(this.tp1, 0L, Errors.NONE, 0));
        this.sender.runOnce();
    }

    @Test
    public void testUnsupportedForMessageFormatInProduceRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                return body instanceof ProduceRequest && ((ProduceRequest)body).hasIdempotentRecords();
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)future, (Class<? extends Exception>)UnsupportedForMessageFormatException.class);
        Assert.assertFalse((boolean)transactionManager.hasError());
    }

    @Test
    public void testUnsupportedVersionInProduceRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                return body instanceof ProduceRequest && ((ProduceRequest)body).hasIdempotentRecords();
            }
        });
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)future, (Class<? extends Exception>)UnsupportedVersionException.class);
        Assert.assertTrue((boolean)transactionManager.hasFatalError());
        this.assertSendFailure(UnsupportedVersionException.class);
    }

    @Test
    public void testSequenceNumberIncrement() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                if (body instanceof ProduceRequest) {
                    ProduceRequest request = (ProduceRequest)body;
                    MemoryRecords records = (MemoryRecords)request.partitionRecordsOrFail().get(SenderTest.this.tp0);
                    Iterator batchIterator = records.batches().iterator();
                    Assert.assertTrue((boolean)batchIterator.hasNext());
                    RecordBatch batch = (RecordBatch)batchIterator.next();
                    Assert.assertFalse((boolean)batchIterator.hasNext());
                    Assert.assertEquals((long)0L, (long)batch.baseSequence());
                    Assert.assertEquals((long)343434L, (long)batch.producerId());
                    Assert.assertEquals((long)0L, (long)batch.producerEpoch());
                    return true;
                }
                return false;
            }
        }, (AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.runOnce();
        sender.runOnce();
        sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertEquals((long)0L, (long)transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        sender.runOnce();
        sender.runOnce();
        String id = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, this.time.milliseconds()));
        this.client.disconnect(id);
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertFalse((String)"Client ready status should be false", (boolean)this.client.isReady(node, this.time.milliseconds()));
        transactionManager.resetProducerId();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343435L, 0));
        sender.runOnce();
        sender.runOnce();
        sender.runOnce();
        Assert.assertEquals((String)"Expected requests to be aborted after pid change", (long)0L, (long)this.client.inFlightRequestCount());
        KafkaMetric recordErrors = (KafkaMetric)m.metrics().get(senderMetrics.recordErrorRate);
        Assert.assertTrue((String)"Expected non-zero value for record send errors", ((Double)recordErrors.metricValue() > 0.0 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, 0));
        this.setupWithTransactionState(transactionManager);
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        sender.runOnce();
        sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)sender.inFlightBatches(this.tp0).size());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
        sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertEquals((long)0L, (long)sender.inFlightBatches(this.tp0).size());
        Assert.assertFalse((String)"Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", (boolean)transactionManager.hasProducerId());
    }

    @Test
    public void testIdempotentSplitBatchAndSend() throws Exception {
        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager txnManager = new TransactionManager();
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        txnManager.setProducerIdAndEpoch(producerIdAndEpoch);
        this.testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
    }

    @Test
    public void testTransactionalSplitBatchAndSend() throws Exception {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager txnManager = new TransactionManager(this.logContext, "testSplitBatchAndSend", 60000, 100L);
        this.setupWithTransactionState(txnManager);
        this.doInitTransactions(txnManager, producerIdAndEpoch);
        txnManager.beginTransaction();
        txnManager.maybeAddPartitionToTransaction(tp);
        this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
        this.sender.runOnce();
        this.testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
    }

    private void testSplitBatchAndSend(TransactionManager txnManager, ProducerIdAndEpoch producerIdAndEpoch, TopicPartition tp) throws Exception {
        int maxRetries = 1;
        String topic = tp.topic();
        int deliveryTimeoutMs = 3000;
        long totalSize = 0x100000L;
        String metricGrpName = "producer-metrics";
        CompressionRatioEstimator.setEstimation((String)topic, (CompressionType)CompressionType.GZIP, (float)0.2f);
        try (Metrics m = new Metrics();){
            this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, CompressionType.GZIP, 0, 0L, deliveryTimeoutMs, m, metricGrpName, (Time)this.time, new ApiVersions(), txnManager, new BufferPool(totalSize, this.batchSize, this.metrics, (Time)this.time, "producer-internal-metrics"));
            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 1000, 1000L, txnManager, new ApiVersions());
            MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
            this.client.prepareMetadataUpdate(metadataUpdate1);
            FutureRecordMetadata f1 = this.accumulator.append((TopicPartition)tp, (long)0L, (byte[])"key1".getBytes(), (byte[])new byte[this.batchSize / 2], null, null, (long)1000L).future;
            FutureRecordMetadata f2 = this.accumulator.append((TopicPartition)tp, (long)0L, (byte[])"key2".getBytes(), (byte[])new byte[this.batchSize / 2], null, null, (long)1000L).future;
            sender.runOnce();
            sender.runOnce();
            Assert.assertEquals((String)"The next sequence should be 2", (long)2L, (long)txnManager.sequenceNumber(tp).longValue());
            String id = this.client.requests().peek().destination();
            Assert.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, this.time.milliseconds()));
            HashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
            this.client.respond((AbstractResponse)new ProduceResponse(responseMap));
            sender.runOnce();
            Assert.assertEquals((String)"The next sequence should be 2", (long)2L, (long)txnManager.sequenceNumber(tp).longValue());
            Assert.assertEquals((double)(CompressionType.GZIP.rate - 0.005f), (double)CompressionRatioEstimator.estimation((String)topic, (CompressionType)CompressionType.GZIP), (double)0.01);
            sender.runOnce();
            Assert.assertEquals((String)"The next sequence number should be 2", (long)2L, (long)txnManager.sequenceNumber(tp).longValue());
            Assert.assertFalse((String)"The future shouldn't have been done.", (boolean)f1.isDone());
            Assert.assertFalse((String)"The future shouldn't have been done.", (boolean)f2.isDone());
            id = this.client.requests().peek().destination();
            Assert.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, this.time.milliseconds()));
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
            this.client.respond(this.produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()), (AbstractResponse)new ProduceResponse(responseMap));
            sender.runOnce();
            Assert.assertTrue((String)"The future should have been done.", (boolean)f1.isDone());
            Assert.assertEquals((String)"The next sequence number should still be 2", (long)2L, (long)txnManager.sequenceNumber(tp).longValue());
            Assert.assertEquals((String)"The last ack'd sequence number should be 0", (long)0L, (long)txnManager.lastAckedSequence(tp));
            Assert.assertFalse((String)"The future shouldn't have been done.", (boolean)f2.isDone());
            Assert.assertEquals((String)"Offset of the first message should be 0", (long)0L, (long)((RecordMetadata)f1.get()).offset());
            sender.runOnce();
            id = this.client.requests().peek().destination();
            Assert.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, this.time.milliseconds()));
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L));
            this.client.respond(this.produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()), (AbstractResponse)new ProduceResponse(responseMap));
            sender.runOnce();
            Assert.assertTrue((String)"The future should have been done.", (boolean)f2.isDone());
            Assert.assertEquals((String)"The next sequence number should be 2", (long)2L, (long)txnManager.sequenceNumber(tp).longValue());
            Assert.assertEquals((String)"The last ack'd sequence number should be 1", (long)1L, (long)txnManager.lastAckedSequence(tp));
            Assert.assertEquals((String)"Offset of the first message should be 1", (long)1L, (long)((RecordMetadata)f2.get()).offset());
            Assert.assertTrue((String)"There should be no batch in the accumulator", (boolean)((Deque)this.accumulator.batches().get(tp)).isEmpty());
            Assert.assertTrue((String)"There should be a split", ((Double)((KafkaMetric)m.metrics().get(senderMetrics.batchSplitRate)).metricValue() > 0.0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testNoDoubleDeallocation() throws Exception {
        long deliverTimeoutMs = 1500L;
        long totalSize = 0x100000L;
        String metricGrpName = "producer-custom-metrics";
        MatchingBufferPool pool = new MatchingBufferPool(totalSize, this.batchSize, this.metrics, this.time, metricGrpName);
        this.setupWithTransactionState(null, false, pool);
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(deliverTimeoutMs);
        Assert.assertFalse((boolean)pool.allMatch());
        this.sender.runOnce();
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertTrue((String)"The batch should have been de-allocated", (boolean)pool.allMatch());
        Assert.assertTrue((boolean)pool.allMatch());
        this.sender.runOnce();
        Assert.assertTrue((String)"The batch should have been de-allocated", (boolean)pool.allMatch());
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedException {
        long deliveryTimeoutMs = 1500L;
        this.setupWithTransactionState(null, true, null);
        FutureRecordMetadata request = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((String)"Expect one in-flight batch in accumulator", (long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        HashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        responseMap.put(this.tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
        this.client.respond((AbstractResponse)new ProduceResponse(responseMap));
        this.time.sleep(deliveryTimeoutMs);
        this.sender.runOnce();
        Assert.assertEquals((String)"Expect zero in-flight batch in accumulator", (long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        try {
            request.get();
            Assert.fail((String)"The expired batch should throw a TimeoutException");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
    }

    @Test
    public void testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder() throws InterruptedException {
        long deliveryTimeoutMs = 1500L;
        this.setupWithTransactionState(null, true, null);
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 1000L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(deliveryTimeoutMs / 2L);
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 1000L);
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(deliveryTimeoutMs / 2L);
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.NONE, 0, 0L));
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiredBatchDoesNotRetry() throws Exception {
        long deliverTimeoutMs = 1500L;
        this.setupWithTransactionState(null, false, null);
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.time.sleep(deliverTimeoutMs);
        HashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        responseMap.put(this.tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.NOT_LEADER_FOR_PARTITION, -1));
        this.sender.runOnce();
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
        long deliverTimeoutMs = 1500L;
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key1".getBytes(), (byte[])"value1".getBytes(), null, null, (long)1000L).future;
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key2".getBytes(), (byte[])"value2".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.MESSAGE_TOO_LARGE, -1));
        this.time.sleep(deliverTimeoutMs);
        this.sender.runOnce();
        Assert.assertTrue((boolean)request1.isDone());
        Assert.assertTrue((boolean)request2.isDone());
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testResetNextBatchExpiry() throws Exception {
        this.client = (MockClient)Mockito.spy((Object)new MockClient((Time)this.time, this.metadata));
        this.setupWithTransactionState(null);
        this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, 1000L);
        this.sender.runOnce();
        this.sender.runOnce();
        this.time.setCurrentTimeMs(this.time.milliseconds() + this.accumulator.getDeliveryTimeoutMs() + 1L);
        this.sender.runOnce();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.client});
        ((MockClient)inOrder.verify((Object)this.client, Mockito.atLeastOnce())).ready((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((MockClient)inOrder.verify((Object)this.client, Mockito.atLeastOnce())).newClientRequest(ArgumentMatchers.anyString(), (AbstractRequest.Builder)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyInt(), (RequestCompletionHandler)ArgumentMatchers.any());
        ((MockClient)inOrder.verify((Object)this.client, Mockito.atLeastOnce())).send((ClientRequest)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((MockClient)inOrder.verify((Object)this.client)).poll(ArgumentMatchers.eq((long)0L), ArgumentMatchers.anyLong());
        ((MockClient)inOrder.verify((Object)this.client)).poll(ArgumentMatchers.eq((long)this.accumulator.getDeliveryTimeoutMs()), ArgumentMatchers.anyLong());
        ((MockClient)inOrder.verify((Object)this.client)).poll(AdditionalMatchers.geq((long)1L), ArgumentMatchers.anyLong());
    }

    @Test
    public void testExpiredBatchesInMultiplePartitions() throws Exception {
        long deliveryTimeoutMs = 1500L;
        this.setupWithTransactionState(null, true, null);
        FutureRecordMetadata request1 = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"k1".getBytes(), (byte[])"v1".getBytes(), null, null, (long)1000L).future;
        FutureRecordMetadata request2 = this.accumulator.append((TopicPartition)this.tp1, (long)this.time.milliseconds(), (byte[])"k2".getBytes(), (byte[])"v2".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
        Assert.assertEquals((String)"Expect one in-flight batch in accumulator", (long)1L, (long)this.sender.inFlightBatches(this.tp0).size());
        HashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        responseMap.put(this.tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
        this.client.respond((AbstractResponse)new ProduceResponse(responseMap));
        this.time.sleep(deliveryTimeoutMs);
        this.sender.runOnce();
        Assert.assertEquals((String)"Expect zero in-flight batch in accumulator", (long)0L, (long)this.sender.inFlightBatches(this.tp0).size());
        try {
            request1.get();
            Assert.fail((String)"The expired batch should throw a TimeoutException");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        try {
            request2.get();
            Assert.fail((String)"The expired batch should throw a TimeoutException");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
    }

    private MockClient.RequestMatcher produceRequestMatcher(final TopicPartition tp, final ProducerIdAndEpoch producerIdAndEpoch, final int sequence, final boolean isTransactional) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                if (!(body instanceof ProduceRequest)) {
                    return false;
                }
                ProduceRequest request = (ProduceRequest)body;
                Map recordsMap = request.partitionRecordsOrFail();
                MemoryRecords records = (MemoryRecords)recordsMap.get(tp);
                if (records == null) {
                    return false;
                }
                List batches = TestUtils.toList(records.batches());
                if (batches.isEmpty() || batches.size() > 1) {
                    return false;
                }
                MutableRecordBatch batch = (MutableRecordBatch)batches.get(0);
                return batch.baseOffset() == 0L && batch.baseSequence() == sequence && batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() == producerIdAndEpoch.epoch && batch.isTransactional() == isTransactional;
            }
        };
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, -1L, logStartOffset);
        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
        return new ProduceResponse(partResp, throttleTimeMs);
    }

    private ProduceResponse produceResponse(Map<TopicPartition, OffsetAndError> responses) {
        LinkedHashMap<TopicPartition, ProduceResponse.PartitionResponse> partResponses = new LinkedHashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        for (Map.Entry<TopicPartition, OffsetAndError> entry : responses.entrySet()) {
            ProduceResponse.PartitionResponse response = new ProduceResponse.PartitionResponse(entry.getValue().error, entry.getValue().offset, -1L, -1L);
            partResponses.put(entry.getKey(), response);
        }
        return new ProduceResponse(partResponses);
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
        return this.produceResponse(tp, offset, error, throttleTimeMs, -1L);
    }

    private void setupWithTransactionState(TransactionManager transactionManager) {
        this.setupWithTransactionState(transactionManager, false, null);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
        int deliveryTimeoutMs = 1500;
        long totalSize = 0x100000L;
        String metricGrpName = "producer-metrics";
        MetricConfig metricConfig = new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
        this.metrics = new Metrics(metricConfig, (Time)this.time);
        BufferPool pool = customPool == null ? new BufferPool(totalSize, this.batchSize, this.metrics, (Time)this.time, metricGrpName) : customPool;
        this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, CompressionType.NONE, 0, 0L, deliveryTimeoutMs, this.metrics, metricGrpName, (Time)this.time, this.apiVersions, transactionManager, pool);
        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
        this.sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, guaranteeOrder, 0x100000, -1, Integer.MAX_VALUE, this.senderMetricsRegistry, (Time)this.time, 1000, 50L, transactionManager, this.apiVersions);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
    }

    private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)("Future should have raised " + expectedError.getSimpleName()));
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)expectedError.isAssignableFrom(e.getCause().getClass()));
        }
    }

    private void prepareAndReceiveInitProducerId(long producerId, Errors error) {
        short producerEpoch = 0;
        if (error != Errors.NONE) {
            producerEpoch = -1;
        }
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                return body instanceof InitProducerIdRequest && ((InitProducerIdRequest)body).transactionalId() == null;
            }
        }, (AbstractResponse)new InitProducerIdResponse(0, error, producerId, producerEpoch));
        this.sender.runOnce();
    }

    private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
        transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE);
        this.sender.runOnce();
        this.sender.runOnce();
        this.prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        this.sender.runOnce();
        Assert.assertTrue((boolean)transactionManager.hasProducerId());
    }

    private void prepareFindCoordinatorResponse(Errors error) {
        this.client.prepareResponse((AbstractResponse)new FindCoordinatorResponse(error, (Node)this.metadata.fetch().nodes().get(0)));
    }

    private void prepareInitProducerResponse(Errors error, long producerId, short producerEpoch) {
        this.client.prepareResponse((AbstractResponse)new InitProducerIdResponse(0, error, producerId, producerEpoch));
    }

    private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType) throws InterruptedException {
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)("Future should have raised " + expectedExceptionType.getName()));
        }
        catch (ExecutionException e) {
            Class<?> causeType = e.getCause().getClass();
            Assert.assertTrue((String)("Unexpected cause " + causeType.getName()), (boolean)expectedExceptionType.isAssignableFrom(causeType));
        }
    }

    class OffsetAndError {
        long offset;
        Errors error;

        OffsetAndError(long offset, Errors error) {
            this.offset = offset;
            this.error = error;
        }
    }

    private class MatchingBufferPool
    extends BufferPool {
        IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;

        MatchingBufferPool(long totalSize, int batchSize, Metrics metrics, Time time, String metricGrpName) {
            super(totalSize, batchSize, metrics, time, metricGrpName);
            this.allocatedBuffers = new IdentityHashMap();
        }

        public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
            ByteBuffer buffer = super.allocate(size, maxTimeToBlockMs);
            this.allocatedBuffers.put(buffer, Boolean.TRUE);
            return buffer;
        }

        public void deallocate(ByteBuffer buffer, int size) {
            if (!this.allocatedBuffers.containsKey(buffer)) {
                throw new IllegalStateException("Deallocating a buffer that is not allocated");
            }
            this.allocatedBuffers.remove(buffer);
            super.deallocate(buffer, size);
        }

        public boolean allMatch() {
            return this.allocatedBuffers.isEmpty();
        }
    }
}

