package org.apache.beam.sdk.extensions.sql.jdbc;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.beam.repackaged.sql.org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.codehaus.jackson.util.BufferRecycler;
import org.hamcrest.CoreMatchers;
import org.hamcrest.collection.IsIn;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineIT.class */
public class BeamSqlLineIT implements Serializable {

    @Rule
    public transient TestPubsub eventsTopic = TestPubsub.create();
    private static String project;
    private static String createPubsubTableStatement;
    private static String setProject;
    private static final SimpleDateFormat dateFormat = new SimpleDateFormat(DateTimeUtils.TIMESTAMP_FORMAT_STRING);
    private ExecutorService pool;

    @BeforeClass
    public static void setUpClass() {
        project = ((GcpOptions) TestPipeline.testingPipelineOptions().as(GcpOptions.class)).getProject();
        setProject = String.format("SET project = '%s';", project);
        createPubsubTableStatement = "CREATE EXTERNAL TABLE taxi_rides (\n         event_timestamp TIMESTAMP,\n         attributes MAP<VARCHAR, VARCHAR>,\n         payload ROW<\n           ride_id VARCHAR,\n           point_idx INT,\n           latitude DOUBLE,\n           longitude DOUBLE,\n           meter_reading DOUBLE,\n           meter_increment DOUBLE,\n           ride_status VARCHAR,\n           passenger_count TINYINT>)\n       TYPE pubsub \n       LOCATION '%s'\n       TBLPROPERTIES '{\"timestampAttributeKey\": \"ts\"}';";
        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
    }

    @Before
    public void setUp() {
        this.pool = Executors.newFixedThreadPool(1);
    }

    @After
    public void tearDown() {
        this.pool.shutdown();
    }

    @Test
    @Ignore("https://jira.apache.org/jira/browse/BEAM-7582")
    public void testSelectFromPubsub() throws Exception {
        Future<List<List<String>>> runQueryInBackground = runQueryInBackground(BeamSqlLineTestingUtils.buildArgs(String.format(createPubsubTableStatement, this.eventsTopic.topicPath()), setProject, "SELECT event_timestamp, taxi_rides.payload.ride_status, taxi_rides.payload.latitude, taxi_rides.payload.longitude from taxi_rides LIMIT 3;"));
        this.eventsTopic.checkIfAnySubscriptionExists(project, Duration.standardMinutes(1L));
        this.eventsTopic.publish(ImmutableList.of(message(convertTimestampToMillis("2018-07-01 21:25:20"), taxiRideJSON("id1", 1, 40.702d, -74.001d, 1000, 10, "enroute", 2)), message(convertTimestampToMillis("2018-07-01 21:26:06"), taxiRideJSON("id2", 2, 40.703d, -74.002d, 1000, 10, "enroute", 4)), message(convertTimestampToMillis("2018-07-02 13:26:06"), taxiRideJSON("id3", 3, 30.0d, -72.32324d, BufferRecycler.DEFAULT_WRITE_CONCAT_BUFFER_LEN, 20, "enroute", 7))));
        Assert.assertThat(Arrays.asList(Arrays.asList("2018-07-01 21:25:20", "enroute", "40.702", "-74.001"), Arrays.asList("2018-07-01 21:26:06", "enroute", "40.703", "-74.002"), Arrays.asList("2018-07-02 13:26:06", "enroute", "30.0", "-72.32324")), CoreMatchers.everyItem(IsIn.isOneOf(runQueryInBackground.get(30L, TimeUnit.SECONDS).toArray())));
    }

    @Test
    @Ignore("https://jira.apache.org/jira/browse/BEAM-7582")
    public void testFilterForSouthManhattan() throws Exception {
        Future<List<List<String>>> runQueryInBackground = runQueryInBackground(BeamSqlLineTestingUtils.buildArgs(String.format(createPubsubTableStatement, this.eventsTopic.topicPath()), setProject, "SELECT event_timestamp, taxi_rides.payload.ride_status, \ntaxi_rides.payload.latitude, taxi_rides.payload.longitude from taxi_rides\n       WHERE taxi_rides.payload.longitude > -74.747\n         AND taxi_rides.payload.longitude < -73.969\n         AND taxi_rides.payload.latitude > 40.699\n         AND taxi_rides.payload.latitude < 40.720 LIMIT 2;"));
        this.eventsTopic.checkIfAnySubscriptionExists(project, Duration.standardMinutes(1L));
        this.eventsTopic.publish(ImmutableList.of(message(convertTimestampToMillis("2018-07-01 21:25:20"), taxiRideJSON("id1", 1, 40.701d, -74.001d, 1000, 10, "enroute", 2)), message(convertTimestampToMillis("2018-07-01 21:26:06"), taxiRideJSON("id2", 2, 40.702d, -74.002d, 1000, 10, "enroute", 4)), message(convertTimestampToMillis("2018-07-02 13:26:06"), taxiRideJSON("id3", 3, 30.0d, -72.32324d, BufferRecycler.DEFAULT_WRITE_CONCAT_BUFFER_LEN, 20, "enroute", 7)), message(convertTimestampToMillis("2018-07-02 14:28:22"), taxiRideJSON("id4", 4, 34.0d, -73.32324d, BufferRecycler.DEFAULT_WRITE_CONCAT_BUFFER_LEN, 20, "enroute", 8))));
        Assert.assertThat(Arrays.asList(Arrays.asList("2018-07-01 21:25:20", "enroute", "40.701", "-74.001"), Arrays.asList("2018-07-01 21:26:06", "enroute", "40.702", "-74.002")), CoreMatchers.everyItem(IsIn.isOneOf(runQueryInBackground.get(30L, TimeUnit.SECONDS).toArray())));
    }

    private String taxiRideJSON(String str, int i, double d, double d2, int i2, int i3, String str2, int i4) {
        ObjectNode createObjectNode = new ObjectMapper().createObjectNode();
        createObjectNode.put("ride_id", str);
        createObjectNode.put("point_idx", i);
        createObjectNode.put("latitude", d);
        createObjectNode.put("longitude", d2);
        createObjectNode.put("meter_reading", i2);
        createObjectNode.put("meter_increment", i3);
        createObjectNode.put("ride_status", str2);
        createObjectNode.put("passenger_count", i4);
        return createObjectNode.toString();
    }

    private Future<List<List<String>>> runQueryInBackground(String[] strArr) {
        return this.pool.submit(() -> {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            BeamSqlLine.runSqlLine(strArr, (InputStream) null, byteArrayOutputStream, (OutputStream) null);
            return BeamSqlLineTestingUtils.toLines(byteArrayOutputStream);
        });
    }

    private long convertTimestampToMillis(String str) throws ParseException {
        return dateFormat.parse(str).getTime();
    }

    private PubsubMessage message(long j, String str) {
        return new PubsubMessage(str.getBytes(StandardCharsets.UTF_8), ImmutableMap.of("ts", String.valueOf(j)));
    }
}
