package org.apache.flink.streaming.connectors.elasticsearch;

import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.class */
public class ElasticsearchSinkBaseTest {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest$DummyElasticsearchApiCallBridge.class */
    private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge<Client> {
        private static final long serialVersionUID = -4272760730959041699L;

        private DummyElasticsearchApiCallBridge() {
        }

        public Client createClient(Map<String, String> map) {
            return (Client) Mockito.mock(Client.class);
        }

        public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
            return null;
        }

        @Nullable
        public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
            if (bulkItemResponse.isFailed()) {
                return new Exception(bulkItemResponse.getFailure().getMessage());
            }
            return null;
        }

        public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy bulkFlushBackoffPolicy) {
        }

        /* renamed from: createClient, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ AutoCloseable m0createClient(Map map) throws IOException {
            return createClient((Map<String, String>) map);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest$DummyElasticsearchSink.class */
    private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
        private static final long serialVersionUID = 5051907841570096991L;
        private transient BulkProcessor mockBulkProcessor;
        private transient BulkRequest nextBulkRequest;
        private transient MultiShotLatch flushLatch;
        private List<? extends Throwable> mockItemFailuresList;
        private Throwable nextBulkFailure;

        public DummyElasticsearchSink(Map<String, String> map, ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler actionRequestFailureHandler) {
            super(new DummyElasticsearchApiCallBridge(), map, elasticsearchSinkFunction, actionRequestFailureHandler);
            this.nextBulkRequest = new BulkRequest();
            this.flushLatch = new MultiShotLatch();
        }

        public void manualBulkRequestWithAllPendingRequests() {
            this.flushLatch.trigger();
            this.mockBulkProcessor.flush();
        }

        public void continueFlush() {
            this.flushLatch.trigger();
        }

        public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> list) {
            this.mockItemFailuresList = list;
        }

        public void setFailNextBulkRequestCompletely(Throwable th) {
            this.nextBulkFailure = th;
        }

        public BulkProcessor getMockBulkProcessor() {
            return this.mockBulkProcessor;
        }

        protected BulkProcessor buildBulkProcessor(final BulkProcessor.Listener listener) {
            this.mockBulkProcessor = (BulkProcessor) Mockito.mock(BulkProcessor.class);
            Mockito.when(this.mockBulkProcessor.add((IndexRequest) Matchers.any(IndexRequest.class))).thenAnswer(new Answer<Object>() { // from class: org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBaseTest.DummyElasticsearchSink.1
                public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                    DummyElasticsearchSink.this.nextBulkRequest.add((IndexRequest) invocationOnMock.getArgumentAt(0, IndexRequest.class));
                    return null;
                }
            });
            ((BulkProcessor) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBaseTest.DummyElasticsearchSink.2
                public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                    while (DummyElasticsearchSink.this.nextBulkRequest.numberOfActions() > 0) {
                        DummyElasticsearchSink.this.flushLatch.await();
                        BulkRequest bulkRequest = DummyElasticsearchSink.this.nextBulkRequest;
                        DummyElasticsearchSink.this.nextBulkRequest = new BulkRequest();
                        listener.beforeBulk(123L, bulkRequest);
                        if (DummyElasticsearchSink.this.nextBulkFailure == null) {
                            BulkItemResponse[] bulkItemResponseArr = new BulkItemResponse[bulkRequest.requests().size()];
                            for (int i = 0; i < bulkRequest.requests().size(); i++) {
                                Throwable th = (Throwable) DummyElasticsearchSink.this.mockItemFailuresList.get(i);
                                if (th == null) {
                                    bulkItemResponseArr[i] = new BulkItemResponse(i, "opType", (ActionResponse) Mockito.mock(ActionResponse.class));
                                } else {
                                    bulkItemResponseArr[i] = new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", th));
                                }
                            }
                            listener.afterBulk(123L, bulkRequest, new BulkResponse(bulkItemResponseArr, 1000L));
                        } else {
                            listener.afterBulk(123L, bulkRequest, DummyElasticsearchSink.this.nextBulkFailure);
                        }
                    }
                    return null;
                }
            }).when(this.mockBulkProcessor)).flush();
            return this.mockBulkProcessor;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest$DummyRetryFailureHandler.class */
    private static class DummyRetryFailureHandler implements ActionRequestFailureHandler {
        private static final long serialVersionUID = 5400023700099200745L;

        private DummyRetryFailureHandler() {
        }

        public void onFailure(ActionRequest actionRequest, Throwable th, int i, RequestIndexer requestIndexer) throws Throwable {
            requestIndexer.add(new ActionRequest[]{actionRequest});
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest$SimpleSinkFunction.class */
    private static class SimpleSinkFunction<String> implements ElasticsearchSinkFunction<String> {
        private static final long serialVersionUID = -176739293659135148L;

        private SimpleSinkFunction() {
        }

        public void process(String string, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            HashMap hashMap = new HashMap();
            hashMap.put("data", string);
            requestIndexer.add(new IndexRequest[]{Requests.indexRequest().index("index").type("type").id("id").source(hashMap)});
        }
    }

    @Test
    public void testCollectionArgumentNotModified() {
        HashMap hashMap = new HashMap();
        hashMap.put("bulk.flush.backoff.delay", "1");
        hashMap.put("bulk.flush.backoff.enable", "true");
        hashMap.put("bulk.flush.backoff.retries", "1");
        hashMap.put("bulk.flush.backoff.type", "CONSTANT");
        hashMap.put("bulk.flush.interval.ms", "1");
        hashMap.put("bulk.flush.max.actions", "1");
        hashMap.put("bulk.flush.max.size.mb", "1");
        new DummyElasticsearchSink(Collections.unmodifiableMap(hashMap), new SimpleSinkFunction(), new NoOpFailureHandler());
    }

    @Test
    public void testItemFailureRethrownOnInvoke() throws Throwable {
        DummyElasticsearchSink dummyElasticsearchSink = new DummyElasticsearchSink(new HashMap(), new SimpleSinkFunction(), new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyElasticsearchSink));
        oneInputStreamOperatorTestHarness.open();
        dummyElasticsearchSink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(1))).add((IndexRequest) Matchers.any(IndexRequest.class));
        dummyElasticsearchSink.manualBulkRequestWithAllPendingRequests();
        try {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord("next msg"));
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for record"));
        }
    }

    @Test
    public void testItemFailureRethrownOnCheckpoint() throws Throwable {
        DummyElasticsearchSink dummyElasticsearchSink = new DummyElasticsearchSink(new HashMap(), new SimpleSinkFunction(), new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyElasticsearchSink));
        oneInputStreamOperatorTestHarness.open();
        dummyElasticsearchSink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(1))).add((IndexRequest) Matchers.any(IndexRequest.class));
        dummyElasticsearchSink.manualBulkRequestWithAllPendingRequests();
        try {
            oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for record"));
        }
    }

    @Test(timeout = 5000)
    public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
        DummyElasticsearchSink dummyElasticsearchSink = new DummyElasticsearchSink(new HashMap(), new SimpleSinkFunction(), new NoOpFailureHandler());
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyElasticsearchSink));
        oneInputStreamOperatorTestHarness.open();
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(null);
        arrayList.add(new Exception("artificial failure for record"));
        dummyElasticsearchSink.setMockItemFailuresListForNextBulkItemResponses(arrayList);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(1))).add((IndexRequest) Matchers.any(IndexRequest.class));
        dummyElasticsearchSink.manualBulkRequestWithAllPendingRequests();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-2"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-3"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(3))).add((IndexRequest) Matchers.any(IndexRequest.class));
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBaseTest.1
            public void go() throws Exception {
                oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
            }
        };
        checkedThread.start();
        while (checkedThread.getState() != Thread.State.WAITING) {
            Thread.sleep(10L);
        }
        dummyElasticsearchSink.continueFlush();
        try {
            checkedThread.sync();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for record"));
        }
    }

    @Test
    public void testBulkFailureRethrownOnInvoke() throws Throwable {
        DummyElasticsearchSink dummyElasticsearchSink = new DummyElasticsearchSink(new HashMap(), new SimpleSinkFunction(), new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyElasticsearchSink));
        oneInputStreamOperatorTestHarness.open();
        dummyElasticsearchSink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(1))).add((IndexRequest) Matchers.any(IndexRequest.class));
        dummyElasticsearchSink.manualBulkRequestWithAllPendingRequests();
        try {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord("next msg"));
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for bulk request"));
        }
    }

    @Test
    public void testBulkFailureRethrownOnCheckpoint() throws Throwable {
        DummyElasticsearchSink dummyElasticsearchSink = new DummyElasticsearchSink(new HashMap(), new SimpleSinkFunction(), new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyElasticsearchSink));
        oneInputStreamOperatorTestHarness.open();
        dummyElasticsearchSink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(1))).add((IndexRequest) Matchers.any(IndexRequest.class));
        dummyElasticsearchSink.manualBulkRequestWithAllPendingRequests();
        try {
            oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for bulk request"));
        }
    }

    @Test(timeout = 5000)
    public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
        DummyElasticsearchSink dummyElasticsearchSink = new DummyElasticsearchSink(new HashMap(), new SimpleSinkFunction(), new NoOpFailureHandler());
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyElasticsearchSink));
        oneInputStreamOperatorTestHarness.open();
        dummyElasticsearchSink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(1))).add((IndexRequest) Matchers.any(IndexRequest.class));
        dummyElasticsearchSink.manualBulkRequestWithAllPendingRequests();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-2"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-3"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(3))).add((IndexRequest) Matchers.any(IndexRequest.class));
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBaseTest.2
            public void go() throws Exception {
                oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
            }
        };
        checkedThread.start();
        while (checkedThread.getState() != Thread.State.WAITING) {
            Thread.sleep(10L);
        }
        dummyElasticsearchSink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
        dummyElasticsearchSink.continueFlush();
        try {
            checkedThread.sync();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for bulk request"));
        }
    }

    @Test(timeout = 5000)
    public void testAtLeastOnceSink() throws Throwable {
        DummyElasticsearchSink dummyElasticsearchSink = new DummyElasticsearchSink(new HashMap(), new SimpleSinkFunction(), new DummyRetryFailureHandler());
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyElasticsearchSink));
        oneInputStreamOperatorTestHarness.open();
        dummyElasticsearchSink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(1))).add((IndexRequest) Matchers.any(IndexRequest.class));
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBaseTest.3
            public void go() throws Exception {
                oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
            }
        };
        checkedThread.start();
        while (checkedThread.getState() != Thread.State.WAITING) {
            Thread.sleep(10L);
        }
        dummyElasticsearchSink.continueFlush();
        while (checkedThread.getState() != Thread.State.WAITING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(1L, dummyElasticsearchSink.getNumPendingRequests());
        dummyElasticsearchSink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null));
        dummyElasticsearchSink.continueFlush();
        checkedThread.sync();
        oneInputStreamOperatorTestHarness.close();
    }

    @Test(timeout = 5000)
    public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Exception {
        DummyElasticsearchSink dummyElasticsearchSink = new DummyElasticsearchSink(new HashMap(), new SimpleSinkFunction(), new DummyRetryFailureHandler());
        dummyElasticsearchSink.disableFlushOnCheckpoint();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(dummyElasticsearchSink));
        oneInputStreamOperatorTestHarness.open();
        dummyElasticsearchSink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        ((BulkProcessor) Mockito.verify(dummyElasticsearchSink.getMockBulkProcessor(), Mockito.times(1))).add((IndexRequest) Matchers.any(IndexRequest.class));
        oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
        oneInputStreamOperatorTestHarness.close();
    }
}
