package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT.class */
public class BigQueryIOStorageReadTableRowIT {
    private static final String DATASET_ID = "big_query_import_export";
    private static final String TABLE_PREFIX = "parallel_read_table_row_";
    private BigQueryIOStorageReadTableRowOptions options;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT$BigQueryIOStorageReadTableRowOptions.class */
    public interface BigQueryIOStorageReadTableRowOptions extends TestPipelineOptions, ExperimentalOptions {
        @Description("The table to be read")
        @Validation.Required
        String getInputTable();

        void setInputTable(String str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTableRowIT$TableRowToKVPairFn.class */
    public static class TableRowToKVPairFn extends SimpleFunction<TableRow, KV<String, String>> {
        private TableRowToKVPairFn() {
        }

        @Override // org.apache.beam.sdk.transforms.SimpleFunction, org.apache.beam.sdk.transforms.InferableFunction, org.apache.beam.sdk.transforms.ProcessFunction
        public KV<String, String> apply(TableRow tableRow) {
            CharSequence charSequence = (CharSequence) tableRow.get("sample_string");
            return KV.of(charSequence != null ? charSequence.toString() : "null", BigQueryHelpers.toJsonString(tableRow));
        }
    }

    private void setUpTestEnvironment(String str) {
        PipelineOptionsFactory.register(BigQueryIOStorageReadTableRowOptions.class);
        this.options = (BigQueryIOStorageReadTableRowOptions) TestPipeline.testingPipelineOptions().as(BigQueryIOStorageReadTableRowOptions.class);
        this.options.setInputTable(((GcpOptions) TestPipeline.testingPipelineOptions().as(GcpOptions.class)).getProject() + ":" + DATASET_ID + "." + TABLE_PREFIX + str);
        this.options.setTempLocation(this.options.getTempRoot() + "/temp-it/");
    }

    private static void runPipeline(BigQueryIOStorageReadTableRowOptions bigQueryIOStorageReadTableRowOptions) {
        Pipeline create = Pipeline.create(bigQueryIOStorageReadTableRowOptions);
        PCollection pCollection = (PCollection) ((PCollection) create.apply("ExportTable", BigQueryIO.readTableRows().from(bigQueryIOStorageReadTableRowOptions.getInputTable()).withMethod(BigQueryIO.TypedRead.Method.EXPORT))).apply("MapExportedRows", MapElements.via((SimpleFunction) new TableRowToKVPairFn()));
        PCollection pCollection2 = (PCollection) ((PCollection) create.apply("DirectReadTable", BigQueryIO.readTableRows().from(bigQueryIOStorageReadTableRowOptions.getInputTable()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ))).apply("MapDirectReadRows", MapElements.via((SimpleFunction) new TableRowToKVPairFn()));
        final TupleTag tupleTag = new TupleTag();
        final TupleTag tupleTag2 = new TupleTag();
        PAssert.that((PCollection) ((PCollection) KeyedPCollectionTuple.of(tupleTag, pCollection).and(tupleTag2, pCollection2).apply(CoGroupByKey.create())).apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, Set<String>>>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOStorageReadTableRowIT.1
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, CoGbkResult>, KV<String, Set<String>>>.ProcessContext processContext) throws Exception {
                KV<String, CoGbkResult> element = processContext.element();
                HashSet hashSet = new HashSet();
                Iterator it = element.getValue().getAll(TupleTag.this).iterator();
                while (it.hasNext()) {
                    hashSet.add((String) it.next());
                }
                for (String str : element.getValue().getAll(tupleTag2)) {
                    if (hashSet.contains(str)) {
                        hashSet.remove(str);
                    } else {
                        hashSet.add(str);
                    }
                }
                if (hashSet.isEmpty()) {
                    return;
                }
                processContext.output(KV.of(element.getKey(), hashSet));
            }
        }))).empty();
        create.run().waitUntilFinish();
    }

    @Test
    public void testBigQueryStorageReadTableRow1() throws Exception {
        setUpTestEnvironment("1");
        runPipeline(this.options);
    }

    @Test
    public void testBigQueryStorageReadTableRow10k() throws Exception {
        setUpTestEnvironment("10k");
        runPipeline(this.options);
    }

    @Test
    public void testBigQueryStorageReadTableRow100k() throws Exception {
        setUpTestEnvironment("100k");
        runPipeline(this.options);
    }
}
