package co.cask.hydrator.transforms;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.Lookup;
import co.cask.cdap.etl.api.LookupProvider;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.StageMetrics;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.TransformContext;
import co.cask.hydrator.plugin.transform.JavaTypeConverters;
import co.cask.hydrator.plugin.transform.ScriptContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.python.core.Py;
import org.python.core.PyCode;
import org.python.util.PythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("PythonEvaluator")
@Description("Executes user-provided Python code that transforms one record into another.")
@Plugin(type = "transform")
/* loaded from: input_file:co/cask/hydrator/transforms/PythonEvaluator.class */
public class PythonEvaluator extends Transform<StructuredRecord, StructuredRecord> {
    private static final String INPUT_STRUCTURED_RECORD_VARIABLE_NAME = "dont_name_your_variable_this1";
    private static final String EMITTER_VARIABLE_NAME = "dont_name_your_variable_this2";
    private static final String CONTEXT_NAME = "dont_name_your_context_this";
    private Schema schema;
    private final Config config;
    private StageMetrics metrics;
    private Logger logger;
    private PythonInterpreter interpreter;
    private PyCode compiledScript;

    /* loaded from: input_file:co/cask/hydrator/transforms/PythonEvaluator$Config.class */
    public static class Config extends PluginConfig {

        @Description("Python code defining how to transform one record into another. The script must implement a function called 'transform', which takes as input a dictionary representing the input record, an emitter object, and a context object (which contains CDAP metrics and logger). The emitter object can be used to emit one or more key-value pairs to the next stage. It can also be used to emit errors. For example:\n'def transform(record, emitter, context):\n  if record['count'] == 0:\n    emitter.emitError({\"errorCode\":31, \"errorMsg\":\"Count is zero.\", \"invalidRecord\": record}\n    return\n  record['count'] *= 1024\n  if(record['count'] < 0):\n    context.getMetrics().count(\"negative.count\", 1)\n    context.getLogger().debug(\"Received record with negative count\")\n  emitter.emit(record)'\nwill scale the 'count' field by 1024.")
        private final String script;

        @Description("The schema of the output object. If no schema is given, it is assumed that the output schema is the same as the input schema.")
        @Nullable
        private final String schema;

        public Config(String str, String str2) {
            this.script = str;
            this.schema = str2;
        }
    }

    /* loaded from: input_file:co/cask/hydrator/transforms/PythonEvaluator$PythonEmitter.class */
    public final class PythonEmitter implements Emitter<Map> {
        private final Emitter<StructuredRecord> emitter;
        private final Schema schema;

        public PythonEmitter(Emitter<StructuredRecord> emitter, Schema schema) {
            this.emitter = emitter;
            this.schema = schema;
        }

        public void emit(Map map) {
            this.emitter.emit(decode(map));
        }

        public void emitError(InvalidEntry<Map> invalidEntry) {
            this.emitter.emitError(new InvalidEntry(invalidEntry.getErrorCode(), invalidEntry.getErrorMsg(), decode((Map) invalidEntry.getInvalidRecord())));
        }

        public void emitError(Map map) {
            this.emitter.emitError(new InvalidEntry(((Integer) map.get("errorCode")).intValue(), (String) map.get("errorMsg"), decode((Map) map.get("invalidRecord"))));
        }

        private StructuredRecord decode(Map map) {
            return PythonEvaluator.this.decodeRecord(map, this.schema);
        }
    }

    public PythonEvaluator(Config config) {
        this.config = config;
    }

    public void initialize(TransformContext transformContext) {
        this.metrics = transformContext.getMetrics();
        this.logger = LoggerFactory.getLogger(PythonEvaluator.class.getName() + " - Stage:" + transformContext.getStageName());
        init();
    }

    public void destroy() {
        if (this.interpreter != null) {
            this.interpreter.cleanup();
        }
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
        super.configurePipeline(pipelineConfigurer);
        if (this.config.schema == null) {
            pipelineConfigurer.getStageConfigurer().setOutputSchema(pipelineConfigurer.getStageConfigurer().getInputSchema());
            return;
        }
        try {
            pipelineConfigurer.getStageConfigurer().setOutputSchema(Schema.parseJson(this.config.schema));
        } catch (IOException e) {
            throw new IllegalArgumentException("Unable to parse schema: " + e.getMessage(), e);
        }
    }

    public void transform(StructuredRecord structuredRecord, Emitter<StructuredRecord> emitter) {
        try {
            PythonEmitter pythonEmitter = new PythonEmitter(emitter, this.schema == null ? structuredRecord.getSchema() : this.schema);
            this.interpreter.set(INPUT_STRUCTURED_RECORD_VARIABLE_NAME, encode(structuredRecord, structuredRecord.getSchema()));
            this.interpreter.set(EMITTER_VARIABLE_NAME, pythonEmitter);
            Py.runCode(this.compiledScript, this.interpreter.getLocals(), this.interpreter.getLocals());
        } catch (Exception e) {
            throw new IllegalArgumentException("Could not transform input: " + e.getMessage(), e);
        }
    }

    private Object encode(Object obj, Schema schema) {
        switch (schema.getType()) {
            case NULL:
            case BOOLEAN:
            case INT:
            case LONG:
            case FLOAT:
            case DOUBLE:
            case BYTES:
            case STRING:
                return obj;
            case ENUM:
            default:
                throw new RuntimeException("Unable to encode object with schema " + schema);
            case ARRAY:
                return encodeArray((List) obj, schema.getComponentSchema());
            case MAP:
                return encodeMap((Map) obj, schema.getMapSchema().getKey(), schema.getMapSchema().getValue());
            case RECORD:
                return encodeRecord((StructuredRecord) obj, schema);
            case UNION:
                return encodeUnion(obj, schema.getUnionSchemas());
        }
    }

    private Object encodeRecord(StructuredRecord structuredRecord, Schema schema) {
        HashMap hashMap = new HashMap();
        for (Schema.Field field : schema.getFields()) {
            hashMap.put(field.getName(), encode(structuredRecord.get(field.getName()), field.getSchema()));
        }
        return hashMap;
    }

    private Object encodeUnion(Object obj, List<Schema> list) {
        Iterator<Schema> it = list.iterator();
        while (it.hasNext()) {
            try {
                return encode(obj, it.next());
            } catch (Exception e) {
            }
        }
        throw new RuntimeException("Unable to encode union with schema " + list);
    }

    private Object encodeMap(Map<Object, Object> map, Schema schema, Schema schema2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<Object, Object> entry : map.entrySet()) {
            hashMap.put(encode(entry.getKey(), schema), encode(entry.getValue(), schema2));
        }
        return hashMap;
    }

    private Object encodeArray(List list, Schema schema) {
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(encode(it.next(), schema));
        }
        return arrayList;
    }

    private Object decode(Object obj, Schema schema) {
        switch (schema.getType()) {
            case NULL:
            case BOOLEAN:
            case INT:
            case LONG:
            case FLOAT:
            case DOUBLE:
            case BYTES:
            case STRING:
                return decodeSimpleType(obj, schema);
            case ENUM:
            default:
                throw new RuntimeException("Unable to decode object with schema " + schema);
            case ARRAY:
                return decodeArray((List) obj, schema.getComponentSchema());
            case MAP:
                return decodeMap((Map) obj, schema.getMapSchema().getKey(), schema.getMapSchema().getValue());
            case RECORD:
                return decodeRecord((Map) obj, schema);
            case UNION:
                return decodeUnion(obj, schema.getUnionSchemas());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StructuredRecord decodeRecord(Map map, Schema schema) {
        StructuredRecord.Builder builder = StructuredRecord.builder(schema);
        for (Schema.Field field : schema.getFields()) {
            String name = field.getName();
            builder.set(name, decode(map.get(name), field.getSchema()));
        }
        return builder.build();
    }

    private Object decodeSimpleType(Object obj, Schema schema) {
        switch (schema.getType()) {
            case NULL:
                return null;
            case BOOLEAN:
                return (Boolean) obj;
            case INT:
                return (Integer) obj;
            case LONG:
                return obj instanceof BigInteger ? Long.valueOf(((BigInteger) obj).longValue()) : (Long) obj;
            case FLOAT:
                return (Float) obj;
            case DOUBLE:
                return (Double) obj;
            case BYTES:
                return (byte[]) obj;
            case STRING:
                return (String) obj;
            default:
                throw new RuntimeException("Unable to decode object with schema " + schema);
        }
    }

    private Map<Object, Object> decodeMap(Map<Object, Object> map, Schema schema, Schema schema2) {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<Object, Object> entry : map.entrySet()) {
            newHashMap.put(decode(entry.getKey(), schema), decode(entry.getValue(), schema2));
        }
        return newHashMap;
    }

    private List<Object> decodeArray(List list, Schema schema) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list.size());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            newArrayListWithCapacity.add(decode(it.next(), schema));
        }
        return newArrayListWithCapacity;
    }

    private Object decodeUnion(Object obj, List<Schema> list) {
        Iterator<Schema> it = list.iterator();
        while (it.hasNext()) {
            try {
                return decode(obj, it.next());
            } catch (Exception e) {
            }
        }
        throw new RuntimeException("Unable to decode union with schema " + list);
    }

    private void init() {
        this.interpreter = new PythonInterpreter();
        this.interpreter.set(CONTEXT_NAME, new ScriptContext(this.logger, this.metrics, new LookupProvider() { // from class: co.cask.hydrator.transforms.PythonEvaluator.1
            public <T> Lookup<T> provide(String str, Map<String, String> map) {
                throw new UnsupportedOperationException("lookup is currently not supported.");
            }
        }, null, new JavaTypeConverters() { // from class: co.cask.hydrator.transforms.PythonEvaluator.2
            @Override // co.cask.hydrator.plugin.transform.JavaTypeConverters
            public Object mapToJSObject(Map<?, ?> map) {
                return null;
            }
        }));
        this.compiledScript = this.interpreter.compile(String.format("%s\ntransform(%s, %s, %s)", this.config.script, INPUT_STRUCTURED_RECORD_VARIABLE_NAME, EMITTER_VARIABLE_NAME, CONTEXT_NAME));
        if (this.config.schema != null) {
            try {
                this.schema = Schema.parseJson(this.config.schema);
            } catch (IOException e) {
                throw new IllegalArgumentException("Unable to parse schema: " + e.getMessage(), e);
            }
        }
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<StructuredRecord>) emitter);
    }
}
