package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.csv.CSVFormat;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.class */
public class BeamKafkaCSVTable extends BeamKafkaTable {
    private final CSVFormat csvFormat;

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable$CsvRecorderDecoder.class */
    private static class CsvRecorderDecoder extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
        private final Schema schema;
        private final CSVFormat format;

        CsvRecorderDecoder(Schema schema, CSVFormat cSVFormat) {
            this.schema = schema;
            this.format = cSVFormat;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> pCollection) {
            return ((PCollection) pCollection.apply("decodeCsvRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, Row>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaCSVTable.CsvRecorderDecoder.1
                @DoFn.ProcessElement
                public void processElement(DoFn<KV<byte[], byte[]>, Row>.ProcessContext processContext) {
                    Iterator<Row> it = BeamTableUtils.csvLines2BeamRows(CsvRecorderDecoder.this.format, new String(processContext.element().getValue(), StandardCharsets.UTF_8), CsvRecorderDecoder.this.schema).iterator();
                    while (it.hasNext()) {
                        processContext.output(it.next());
                    }
                }
            }))).setRowSchema(this.schema);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable$CsvRecorderEncoder.class */
    private static class CsvRecorderEncoder extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> {
        private final CSVFormat format;

        CsvRecorderEncoder(CSVFormat cSVFormat) {
            this.format = cSVFormat;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<KV<byte[], byte[]>> expand(PCollection<Row> pCollection) {
            return (PCollection) pCollection.apply("encodeCsvRecord", ParDo.of(new DoFn<Row, KV<byte[], byte[]>>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaCSVTable.CsvRecorderEncoder.1
                @DoFn.ProcessElement
                public void processElement(DoFn<Row, KV<byte[], byte[]>>.ProcessContext processContext) {
                    processContext.output(KV.of(new byte[0], BeamTableUtils.beamRow2CsvLine(processContext.element(), CsvRecorderEncoder.this.format).getBytes(StandardCharsets.UTF_8)));
                }
            }));
        }
    }

    public BeamKafkaCSVTable(Schema schema, String str, List<String> list) {
        this(schema, str, list, CSVFormat.DEFAULT);
    }

    public BeamKafkaCSVTable(Schema schema, String str, List<String> list, CSVFormat cSVFormat) {
        super(schema, str, list);
        this.csvFormat = cSVFormat;
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable
    protected PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
        return new CsvRecorderDecoder(this.schema, this.csvFormat);
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable
    protected PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
        return new CsvRecorderEncoder(this.csvFormat);
    }
}
