package co.cask.hydrator.plugin.batch.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.hydrator.common.SchemaValidator;
import co.cask.hydrator.plugin.common.Properties;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

@Name("KVTable")
@Description("Writes records to a KeyValueTable, using configurable fields from input records as the key and value.")
@Plugin(type = "batchsink")
/* loaded from: input_file:lib/core-plugins-1.2.0.jar:co/cask/hydrator/plugin/batch/sink/KVTableSink.class */
public class KVTableSink extends BatchWritableSink<StructuredRecord, byte[], byte[]> {
    private static final String NAME_DESC = "Name of the dataset. If it does not already exist, one will be created.";
    private static final String KEY_FIELD_DESC = "The name of the field to use as the key. Defaults to 'key'.";
    private static final String VALUE_FIELD_DESC = "The name of the field to use as the value. Defaults to 'value'.";
    private final KVTableConfig kvTableConfig;

    /* loaded from: input_file:lib/core-plugins-1.2.0.jar:co/cask/hydrator/plugin/batch/sink/KVTableSink$KVTableConfig.class */
    public static class KVTableConfig extends PluginConfig {

        @Description(KVTableSink.NAME_DESC)
        private String name;

        @Name(Properties.KeyValueTable.KEY_FIELD)
        @Description(KVTableSink.KEY_FIELD_DESC)
        @Nullable
        private String keyField;

        @Name(Properties.KeyValueTable.VALUE_FIELD)
        @Description(KVTableSink.VALUE_FIELD_DESC)
        @Nullable
        private String valueField;

        public KVTableConfig() {
            this(null, "key", "value");
        }

        public KVTableConfig(String str, String str2, String str3) {
            this.name = str;
            this.keyField = str2;
            this.valueField = str3;
        }
    }

    public KVTableSink(KVTableConfig kVTableConfig) {
        this.kvTableConfig = kVTableConfig;
    }

    @Override // co.cask.hydrator.plugin.batch.sink.BatchWritableSink
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
        if (inputSchema != null) {
            SchemaValidator.validateFieldsArePresentInSchema(inputSchema, this.kvTableConfig.keyField);
            SchemaValidator.validateFieldsArePresentInSchema(inputSchema, this.kvTableConfig.valueField);
            validateSchemaTypeIsStringOrBytes(inputSchema, this.kvTableConfig.keyField, false);
            validateSchemaTypeIsStringOrBytes(inputSchema, this.kvTableConfig.valueField, true);
        }
    }

    private void validateSchemaTypeIsStringOrBytes(Schema schema, String str, boolean z) {
        Schema schema2 = schema.getField(str).getSchema();
        boolean isNullable = schema2.isNullable();
        if (!z && isNullable) {
            throw new IllegalArgumentException("Field " + str + " cannot be nullable");
        }
        Schema.Type type = isNullable ? schema2.getNonNullable().getType() : schema2.getType();
        if (type != Schema.Type.STRING && type != Schema.Type.BYTES) {
            throw new IllegalArgumentException(String.format("Field name %s is of type %s, only types String and Bytes are supported for KVTable", str, type));
        }
    }

    @Override // co.cask.hydrator.plugin.batch.sink.BatchWritableSink
    protected Map<String, String> getProperties() {
        HashMap hashMap = this.kvTableConfig.getProperties() == null ? new HashMap() : new HashMap(this.kvTableConfig.getProperties().getProperties());
        hashMap.put("name", this.kvTableConfig.name);
        hashMap.put("type", KeyValueTable.class.getName());
        return hashMap;
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<byte[], byte[]>> emitter) throws Exception {
        byte[] bytes;
        Object obj = structuredRecord.get(this.kvTableConfig.keyField);
        Preconditions.checkArgument(obj != null, "Key cannot be null.");
        Schema schema = structuredRecord.getSchema().getField(this.kvTableConfig.keyField).getSchema();
        if (schema.getType().equals(Schema.Type.STRING)) {
            bytes = Bytes.toBytes((String) obj);
        } else {
            if (!schema.getType().equals(Schema.Type.BYTES)) {
                if (!schema.isNullable()) {
                    throw new Exception(String.format("Key field %s cannot have schema %s. It must of either String or Bytes", this.kvTableConfig.keyField, schema));
                }
                throw new Exception(String.format("Key field %s cannot have nullable schema %s", this.kvTableConfig.keyField, schema));
            }
            bytes = obj instanceof ByteBuffer ? Bytes.toBytes((ByteBuffer) obj) : (byte[]) obj;
        }
        Schema.Field field = structuredRecord.getSchema().getField(this.kvTableConfig.valueField);
        if (field == null) {
            throw new Exception("Value Field " + this.kvTableConfig.valueField + " is missing in the input record");
        }
        byte[] bArr = null;
        Object obj2 = structuredRecord.get(this.kvTableConfig.valueField);
        if (obj2 != null) {
            Schema.Type type = field.getSchema().isNullable() ? field.getSchema().getNonNullable().getType() : field.getSchema().getType();
            if (type.equals(Schema.Type.STRING)) {
                bArr = Bytes.toBytes((String) obj2);
            } else {
                if (!type.equals(Schema.Type.BYTES)) {
                    throw new Exception(String.format("Value field %s cannot have schema %s. It must of either String or Bytes", this.kvTableConfig.valueField, field));
                }
                bArr = obj2 instanceof ByteBuffer ? Bytes.toBytes((ByteBuffer) obj2) : (byte[]) obj2;
            }
        }
        emitter.emit(new KeyValue(bytes, bArr));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<byte[], byte[]>>) emitter);
    }
}
