package org.apache.flink.streaming.connectors.elasticsearch;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.InstantiationUtil;
import org.elasticsearch.client.Client;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.class */
public abstract class ElasticsearchSinkTestBase extends StreamingMultipleProgramsTestBase {
    protected static final String CLUSTER_NAME = "test-cluster";
    protected static EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv;

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();

    @BeforeClass
    public static void prepare() throws Exception {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Starting embedded Elasticsearch node ");
        LOG.info("-------------------------------------------------------------------------");
        embeddedNodeEnv = (EmbeddedElasticsearchNodeEnvironment) InstantiationUtil.instantiate(Class.forName("org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl"));
        embeddedNodeEnv.start(tempFolder.newFolder(), CLUSTER_NAME);
    }

    @AfterClass
    public static void shutdown() throws Exception {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Shutting down embedded Elasticsearch node ");
        LOG.info("-------------------------------------------------------------------------");
        embeddedNodeEnv.close();
    }

    public void runTransportClientTest() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
        HashMap hashMap = new HashMap();
        hashMap.put("bulk.flush.max.actions", "1");
        hashMap.put("cluster.name", CLUSTER_NAME);
        addSource.addSink(createElasticsearchSinkForEmbeddedNode(hashMap, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("transport-client-test-index")));
        executionEnvironment.execute("Elasticsearch TransportClient Test");
        Client client = embeddedNodeEnv.getClient();
        SourceSinkDataTestKit.verifyProducedSinkData(client, "transport-client-test-index");
        client.close();
    }

    public void runNullTransportClientTest() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bulk.flush.max.actions", "1");
        hashMap.put("cluster.name", "my-transport-client-cluster");
        try {
            createElasticsearchSink(hashMap, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
            Assert.fail();
        } catch (IllegalArgumentException e) {
        }
    }

    public void runEmptyTransportClientTest() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bulk.flush.max.actions", "1");
        hashMap.put("cluster.name", "my-transport-client-cluster");
        try {
            createElasticsearchSink(hashMap, Collections.emptyList(), new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
            Assert.fail();
        } catch (IllegalArgumentException e) {
        }
    }

    public void runTransportClientFailsTest() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
        HashMap hashMap = new HashMap();
        hashMap.put("bulk.flush.max.actions", "1");
        hashMap.put("cluster.name", "my-transport-client-cluster");
        addSource.addSink(createElasticsearchSinkForEmbeddedNode(hashMap, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
        try {
            executionEnvironment.execute("Elasticsearch Transport Client Test");
            Assert.fail();
        } catch (JobExecutionException e) {
            Assert.assertTrue(e.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
        }
    }

    protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> map, List<InetSocketAddress> list, ElasticsearchSinkFunction<T> elasticsearchSinkFunction);

    protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(Map<String, String> map, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception;
}
