package co.cask.hydrator.plugin.batch.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.hydrator.common.HiveSchemaConverter;
import co.cask.hydrator.plugin.common.StructuredToAvroTransformer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import parquet.avro.AvroParquetInputFormat;
import parquet.avro.AvroParquetOutputFormat;

@Name("TPFSParquet")
@Description("Sink for a TimePartitionedFileSet that writes data in Parquet format.")
@Plugin(type = "batchsink")
/* loaded from: input_file:lib/core-plugins-1.2.0.jar:co/cask/hydrator/plugin/batch/sink/TimePartitionedFileSetDatasetParquetSink.class */
public class TimePartitionedFileSetDatasetParquetSink extends TimePartitionedFileSetSink<Void, GenericRecord> {
    private static final String SCHEMA_DESC = "The Parquet schema of the record being written to the Sink as a JSON Object.";
    private StructuredToAvroTransformer recordTransformer;
    private final TPFSParquetSinkConfig config;

    /* loaded from: input_file:lib/core-plugins-1.2.0.jar:co/cask/hydrator/plugin/batch/sink/TimePartitionedFileSetDatasetParquetSink$TPFSParquetSinkConfig.class */
    public static class TPFSParquetSinkConfig extends TPFSSinkConfig {

        @Description(TimePartitionedFileSetDatasetParquetSink.SCHEMA_DESC)
        private String schema;

        public TPFSParquetSinkConfig(String str, String str2, @Nullable String str3, @Nullable String str4, @Nullable String str5) {
            super(str, str3, str4, str5);
            this.schema = str2;
        }
    }

    public TimePartitionedFileSetDatasetParquetSink(TPFSParquetSinkConfig tPFSParquetSinkConfig) {
        super(tPFSParquetSinkConfig);
        this.config = tPFSParquetSinkConfig;
    }

    @Override // co.cask.hydrator.plugin.batch.sink.TimePartitionedFileSetSink
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        String str = this.tpfsSinkConfig.name;
        String str2 = this.tpfsSinkConfig.basePath == null ? str : this.tpfsSinkConfig.basePath;
        new Schema.Parser().parse(this.config.schema.toLowerCase());
        try {
            String hiveSchema = HiveSchemaConverter.toHiveSchema(co.cask.cdap.api.data.schema.Schema.parseJson(this.config.schema.toLowerCase()));
            pipelineConfigurer.createDataset(str, TimePartitionedFileSet.class.getName(), FileSetProperties.builder().setBasePath(str2).setInputFormat(AvroParquetInputFormat.class).setOutputFormat(AvroParquetOutputFormat.class).setEnableExploreOnCreate(true).setExploreFormat("parquet").setExploreSchema(hiveSchema.substring(1, hiveSchema.length() - 1)).build());
        } catch (UnsupportedTypeException | IOException e) {
            throw new RuntimeException("Error: Schema is not valid ", e);
        }
    }

    @Override // co.cask.hydrator.plugin.batch.sink.TimePartitionedFileSetSink
    protected Map<String, String> getAdditionalTPFSArguments() {
        HashMap hashMap = new HashMap();
        hashMap.put("output.properties.parquet.avro.schema", this.config.schema.toLowerCase());
        return hashMap;
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.recordTransformer = new StructuredToAvroTransformer(this.config.schema);
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<Void, GenericRecord>> emitter) throws Exception {
        emitter.emit(new KeyValue((Object) null, this.recordTransformer.transform(structuredRecord)));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<Void, GenericRecord>>) emitter);
    }
}
