package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.class */
public class FlinkKafkaProducerBaseTest {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest$DummyFlinkKafkaProducer.class */
    private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
        private static final long serialVersionUID = 1;
        private static final String DUMMY_TOPIC = "dummy-topic";
        private transient KafkaProducer<?, ?> mockProducer;
        private transient List<Callback> pendingCallbacks;
        private transient MultiShotLatch flushLatch;
        private boolean isFlushed;

        DummyFlinkKafkaProducer(Properties properties, KeyedSerializationSchema<T> keyedSerializationSchema, FlinkKafkaPartitioner flinkKafkaPartitioner) {
            super(DUMMY_TOPIC, keyedSerializationSchema, properties, flinkKafkaPartitioner);
            this.mockProducer = (KafkaProducer) Mockito.mock(KafkaProducer.class);
            Mockito.when(this.mockProducer.send((ProducerRecord) Mockito.any(ProducerRecord.class), (Callback) Mockito.any(Callback.class))).thenAnswer(new Answer<Object>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBaseTest.DummyFlinkKafkaProducer.1
                public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                    DummyFlinkKafkaProducer.this.pendingCallbacks.add(invocationOnMock.getArgumentAt(1, Callback.class));
                    return null;
                }
            });
            this.pendingCallbacks = new ArrayList();
            this.flushLatch = new MultiShotLatch();
        }

        long getPendingSize() {
            if (this.flushOnCheckpoint) {
                return numPendingRecords();
            }
            throw new UnsupportedOperationException("getPendingSize not supported when flushing is disabled");
        }

        List<Callback> getPendingCallbacks() {
            return this.pendingCallbacks;
        }

        KafkaProducer<?, ?> getMockKafkaProducer() {
            return this.mockProducer;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.isFlushed = false;
            super.snapshotState(functionSnapshotContext);
            if (this.flushOnCheckpoint && !this.isFlushed) {
                throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
            }
        }

        public void waitUntilFlushStarted() throws Exception {
            this.flushLatch.await();
        }

        protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties properties) {
            return (KafkaProducer<K, V>) this.mockProducer;
        }

        protected void flush() {
            this.flushLatch.trigger();
            while (numPendingRecords() > 0) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Unable to flush producer, task was interrupted");
                }
            }
            this.isFlushed = true;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception {
        new DummyFlinkKafkaProducer(new Properties(), new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), null);
    }

    @Test
    public void testKeyValueDeserializersSetIfMissing() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:12345");
        new DummyFlinkKafkaProducer(properties, new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), null);
        Assert.assertTrue(properties.containsKey("value.serializer"));
        Assert.assertTrue(properties.containsKey("key.serializer"));
        Assert.assertTrue(properties.getProperty("key.serializer").equals(ByteArraySerializer.class.getName()));
        Assert.assertTrue(properties.getProperty("key.serializer").equals(ByteArraySerializer.class.getName()));
    }

    @Test
    public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
        FlinkKafkaPartitioner flinkKafkaPartitioner = (FlinkKafkaPartitioner) Mockito.mock(FlinkKafkaPartitioner.class);
        RuntimeContext runtimeContext = (RuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        Mockito.when(Integer.valueOf(runtimeContext.getIndexOfThisSubtask())).thenReturn(0);
        Mockito.when(Integer.valueOf(runtimeContext.getNumberOfParallelSubtasks())).thenReturn(1);
        ArrayList arrayList = new ArrayList(4);
        arrayList.add(new PartitionInfo("dummy-topic", 3, (Node) null, (Node[]) null, (Node[]) null));
        arrayList.add(new PartitionInfo("dummy-topic", 1, (Node) null, (Node[]) null, (Node[]) null));
        arrayList.add(new PartitionInfo("dummy-topic", 0, (Node) null, (Node[]) null, (Node[]) null));
        arrayList.add(new PartitionInfo("dummy-topic", 2, (Node) null, (Node[]) null, (Node[]) null));
        DummyFlinkKafkaProducer dummyFlinkKafkaProducer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), flinkKafkaPartitioner);
        dummyFlinkKafkaProducer.setRuntimeContext(runtimeContext);
        KafkaProducer<?, ?> mockKafkaProducer = dummyFlinkKafkaProducer.getMockKafkaProducer();
        Mockito.when(mockKafkaProducer.partitionsFor(Mockito.anyString())).thenReturn(arrayList);
        Mockito.when(mockKafkaProducer.metrics()).thenReturn((Object) null);
        dummyFlinkKafkaProducer.open(new Configuration());
        ((FlinkKafkaPartitioner) Mockito.verify(flinkKafkaPartitioner, Mockito.times(1))).open(0, 1);
        dummyFlinkKafkaProducer.invoke("foobar", SinkContextUtil.forTimestamp(0L));
        ((FlinkKafkaPartitioner) Mockito.verify(flinkKafkaPartitioner, Mockito.times(1))).partition("foobar", (byte[]) null, "foobar".getBytes(), "dummy-topic", new int[]{0, 1, 2, 3});
    }

    @Test
    public void testAsyncErrorRethrownOnInvoke() throws Throwable {
        DummyFlinkKafkaProducer dummyFlinkKafkaProducer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), null);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyFlinkKafkaProducer));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        dummyFlinkKafkaProducer.getPendingCallbacks().get(0).onCompletion((RecordMetadata) null, new Exception("artificial async exception"));
        try {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-2"));
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
        }
    }

    @Test
    public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
        DummyFlinkKafkaProducer dummyFlinkKafkaProducer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), null);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyFlinkKafkaProducer));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        dummyFlinkKafkaProducer.getPendingCallbacks().get(0).onCompletion((RecordMetadata) null, new Exception("artificial async exception"));
        try {
            oneInputStreamOperatorTestHarness.snapshot(123L, 123L);
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
        }
    }

    @Test(timeout = 5000)
    public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
        DummyFlinkKafkaProducer dummyFlinkKafkaProducer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), null);
        dummyFlinkKafkaProducer.setFlushOnCheckpoint(true);
        KafkaProducer<?, ?> mockKafkaProducer = dummyFlinkKafkaProducer.getMockKafkaProducer();
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyFlinkKafkaProducer));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-2"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-3"));
        ((KafkaProducer) Mockito.verify(mockKafkaProducer, Mockito.times(3))).send((ProducerRecord) Mockito.any(ProducerRecord.class), (Callback) Mockito.any(Callback.class));
        dummyFlinkKafkaProducer.getPendingCallbacks().get(0).onCompletion((RecordMetadata) null, (Exception) null);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBaseTest.1
            public void go() throws Exception {
                oneInputStreamOperatorTestHarness.snapshot(123L, 123L);
            }
        };
        checkedThread.start();
        dummyFlinkKafkaProducer.getPendingCallbacks().get(1).onCompletion((RecordMetadata) null, new Exception("artificial async failure for 2nd message"));
        dummyFlinkKafkaProducer.getPendingCallbacks().get(2).onCompletion((RecordMetadata) null, (Exception) null);
        try {
            checkedThread.sync();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure for 2nd message"));
        }
    }

    @Test(timeout = 10000)
    public void testAtLeastOnceProducer() throws Throwable {
        DummyFlinkKafkaProducer dummyFlinkKafkaProducer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), null);
        dummyFlinkKafkaProducer.setFlushOnCheckpoint(true);
        KafkaProducer<?, ?> mockKafkaProducer = dummyFlinkKafkaProducer.getMockKafkaProducer();
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyFlinkKafkaProducer));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-2"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-3"));
        ((KafkaProducer) Mockito.verify(mockKafkaProducer, Mockito.times(3))).send((ProducerRecord) Mockito.any(ProducerRecord.class), (Callback) Mockito.any(Callback.class));
        Assert.assertEquals(3L, dummyFlinkKafkaProducer.getPendingSize());
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBaseTest.2
            public void go() throws Exception {
                oneInputStreamOperatorTestHarness.snapshot(123L, 123L);
            }
        };
        checkedThread.start();
        dummyFlinkKafkaProducer.waitUntilFlushStarted();
        Assert.assertTrue("Snapshot returned before all records were flushed", checkedThread.isAlive());
        dummyFlinkKafkaProducer.getPendingCallbacks().get(0).onCompletion((RecordMetadata) null, (Exception) null);
        Assert.assertTrue("Snapshot returned before all records were flushed", checkedThread.isAlive());
        Assert.assertEquals(2L, dummyFlinkKafkaProducer.getPendingSize());
        dummyFlinkKafkaProducer.getPendingCallbacks().get(1).onCompletion((RecordMetadata) null, (Exception) null);
        Assert.assertTrue("Snapshot returned before all records were flushed", checkedThread.isAlive());
        Assert.assertEquals(1L, dummyFlinkKafkaProducer.getPendingSize());
        dummyFlinkKafkaProducer.getPendingCallbacks().get(2).onCompletion((RecordMetadata) null, (Exception) null);
        Assert.assertEquals(0L, dummyFlinkKafkaProducer.getPendingSize());
        checkedThread.sync();
        oneInputStreamOperatorTestHarness.close();
    }

    @Test(timeout = 5000)
    public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
        DummyFlinkKafkaProducer dummyFlinkKafkaProducer = new DummyFlinkKafkaProducer(FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), null);
        dummyFlinkKafkaProducer.setFlushOnCheckpoint(false);
        KafkaProducer<?, ?> mockKafkaProducer = dummyFlinkKafkaProducer.getMockKafkaProducer();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyFlinkKafkaProducer));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        ((KafkaProducer) Mockito.verify(mockKafkaProducer, Mockito.times(1))).send((ProducerRecord) Mockito.any(ProducerRecord.class), (Callback) Mockito.any(Callback.class));
        oneInputStreamOperatorTestHarness.snapshot(123L, 123L);
        oneInputStreamOperatorTestHarness.close();
    }
}
