package co.cask.cdap.etl.batch;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.etl.api.InvalidEntry;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.TransformContext;
import co.cask.cdap.etl.api.batch.BatchConfigurable;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSinkContext;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.cdap.etl.batch.config.ETLBatchConfig;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultEmitter;
import co.cask.cdap.etl.common.Destroyables;
import co.cask.cdap.etl.common.Pipeline;
import co.cask.cdap.etl.common.PipelineRegisterer;
import co.cask.cdap.etl.common.PluginID;
import co.cask.cdap.etl.common.SinkInfo;
import co.cask.cdap.etl.common.StageMetrics;
import co.cask.cdap.etl.common.StructuredRecordStringConverter;
import co.cask.cdap.etl.common.TransformDetail;
import co.cask.cdap.etl.common.TransformExecutor;
import co.cask.cdap.etl.common.TransformInfo;
import co.cask.cdap.etl.common.TransformResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/etl/batch/ETLMapReduce.class */
public class ETLMapReduce extends AbstractMapReduce {
    private static final String SINK_OUTPUTS_KEY = "cdap.etl.sink.outputs";
    private BatchConfigurable<BatchSourceContext> batchSource;
    private List<BatchConfigurable<BatchSinkContext>> batchSinks;
    private Metrics mrMetrics;
    private final ETLBatchConfig config;
    public static final String NAME = ETLMapReduce.class.getSimpleName();
    private static final Logger LOG = LoggerFactory.getLogger(ETLMapReduce.class);
    private static final Type SINK_OUTPUTS_TYPE = new TypeToken<List<SinkOutput>>() { // from class: co.cask.cdap.etl.batch.ETLMapReduce.1
    }.getType();
    private static final Type SINK_INFO_TYPE = new TypeToken<List<SinkInfo>>() { // from class: co.cask.cdap.etl.batch.ETLMapReduce.2
    }.getType();
    private static final Type TRANSFORMINFO_LIST_TYPE = new TypeToken<List<TransformInfo>>() { // from class: co.cask.cdap.etl.batch.ETLMapReduce.3
    }.getType();
    private static final Gson GSON = new Gson();

    @VisibleForTesting
    static final Schema ERROR_SCHEMA = Schema.recordOf("error", new Schema.Field[]{Schema.Field.of(Constants.ErrorDataset.ERRCODE, Schema.of(Schema.Type.INT)), Schema.Field.of(Constants.ErrorDataset.ERRMSG, Schema.unionOf(new Schema[]{Schema.of(Schema.Type.STRING), Schema.of(Schema.Type.NULL)})), Schema.Field.of(Constants.ErrorDataset.INVALIDENTRY, Schema.of(Schema.Type.STRING))});
    private static final org.apache.avro.Schema AVRO_ERROR_SCHEMA = new Schema.Parser().parse(ERROR_SCHEMA.toString());

    /* loaded from: input_file:co/cask/cdap/etl/batch/ETLMapReduce$ETLMapper.class */
    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
        private static final Logger LOG = LoggerFactory.getLogger(ETLMapper.class);
        private static final Gson GSON = new Gson();
        private static final Type TRANSFORMDETAILS_LIST_TYPE = new TypeToken<List<TransformInfo>>() { // from class: co.cask.cdap.etl.batch.ETLMapReduce.ETLMapper.1
        }.getType();
        private Set<String> transformsWithoutErrorDataset;
        private TransformExecutor<KeyValue, Object> transformExecutor;
        private Metrics mapperMetrics;
        private List<WrappedSink<Object, Object, Object>> sinks;
        private Map<String, ErrorSink<Object, Object, Object>> transformErrorSinkMap;

        public void initialize(MapReduceTaskContext<Object, Object> mapReduceTaskContext) throws Exception {
            mapReduceTaskContext.getSpecification().getProperties();
            Map properties = mapReduceTaskContext.getSpecification().getProperties();
            String str = (String) properties.get(Constants.Source.PLUGINID);
            String str2 = (String) properties.get(Constants.Transform.PLUGINIDS);
            Preconditions.checkNotNull(str2, "Transform plugin ids not found in program properties.");
            List<TransformInfo> list = (List) GSON.fromJson(str2, TRANSFORMDETAILS_LIST_TYPE);
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(list.size() + 2);
            BatchSource batchSource = (BatchSource) mapReduceTaskContext.newPluginInstance(str);
            batchSource.initialize((BatchRuntimeContext) new MapReduceRuntimeContext(mapReduceTaskContext, this.mapperMetrics, str));
            newArrayListWithCapacity.add(new TransformDetail(str, batchSource, new StageMetrics(this.mapperMetrics, PluginID.from(str))));
            this.transformErrorSinkMap = new HashMap();
            this.transformsWithoutErrorDataset = new HashSet();
            addTransforms(newArrayListWithCapacity, list, mapReduceTaskContext);
            String str3 = ((Mapper.Context) mapReduceTaskContext.getHadoopContext()).getConfiguration().get(ETLMapReduce.SINK_OUTPUTS_KEY);
            Preconditions.checkNotNull(str3, "Sink outputs not found in Hadoop conf.");
            List<SinkOutput> list2 = (List) GSON.fromJson(str3, ETLMapReduce.SINK_OUTPUTS_TYPE);
            Preconditions.checkArgument(!list2.isEmpty(), "Sink outputs not found in Hadoop conf.");
            boolean hasOneOutput = hasOneOutput(list, list2);
            this.sinks = new ArrayList(list2.size());
            for (SinkOutput sinkOutput : list2) {
                String sinkPluginId = sinkOutput.getSinkPluginId();
                Set<String> sinkOutputs = sinkOutput.getSinkOutputs();
                BatchSink batchSink = (BatchSink) mapReduceTaskContext.newPluginInstance(sinkPluginId);
                batchSink.initialize((BatchRuntimeContext) new MapReduceRuntimeContext(mapReduceTaskContext, this.mapperMetrics, sinkPluginId));
                if (hasOneOutput) {
                    this.sinks.add(new SingleOutputSink(sinkPluginId, batchSink, mapReduceTaskContext, this.mapperMetrics));
                } else {
                    this.sinks.add(new MultiOutputSink(sinkPluginId, batchSink, mapReduceTaskContext, this.mapperMetrics, sinkOutputs, sinkOutput.getErrorDatasetName()));
                }
            }
            this.transformExecutor = new TransformExecutor<>(newArrayListWithCapacity);
        }

        private boolean hasOneOutput(List<TransformInfo> list, List<SinkOutput> list2) {
            Iterator<TransformInfo> it = list.iterator();
            while (it.hasNext()) {
                if (it.next().getErrorDatasetName() != null) {
                    return false;
                }
            }
            HashSet hashSet = new HashSet();
            for (SinkOutput sinkOutput : list2) {
                if (sinkOutput.getErrorDatasetName() != null) {
                    return false;
                }
                hashSet.addAll(sinkOutput.getSinkOutputs());
            }
            return hashSet.size() == 1;
        }

        private void addTransforms(List<TransformDetail> list, List<TransformInfo> list2, MapReduceTaskContext mapReduceTaskContext) throws Exception {
            for (TransformInfo transformInfo : list2) {
                String transformId = transformInfo.getTransformId();
                Transform transform = (Transform) mapReduceTaskContext.newPluginInstance(transformId);
                MapReduceRuntimeContext mapReduceRuntimeContext = new MapReduceRuntimeContext(mapReduceTaskContext, this.mapperMetrics, transformId);
                LOG.debug("Transform Class : {}", transform.getClass().getName());
                transform.initialize((TransformContext) mapReduceRuntimeContext);
                list.add(new TransformDetail(transformId, transform, new StageMetrics(this.mapperMetrics, PluginID.from(transformId))));
                if (transformInfo.getErrorDatasetName() != null) {
                    this.transformErrorSinkMap.put(transformId, new ErrorSink<>(mapReduceTaskContext, transformInfo.getErrorDatasetName()));
                }
            }
        }

        public void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            try {
                TransformResponse<Object> runOneIteration = this.transformExecutor.runOneIteration(new KeyValue(obj, obj2));
                Iterator<Object> emittedRecords = runOneIteration.getEmittedRecords();
                while (emittedRecords.hasNext()) {
                    Object next = emittedRecords.next();
                    Iterator<WrappedSink<Object, Object, Object>> it = this.sinks.iterator();
                    while (it.hasNext()) {
                        it.next().write(next);
                    }
                }
                for (Map.Entry<String, Collection<Object>> entry : runOneIteration.getMapTransformIdToErrorEmitter().entrySet()) {
                    if (this.transformErrorSinkMap.containsKey(entry.getKey())) {
                        this.transformErrorSinkMap.get(entry.getKey()).write(entry.getValue());
                    } else if (!this.transformsWithoutErrorDataset.contains(entry.getKey())) {
                        LOG.warn("Transform : {} has error records, but does not have a error dataset configured.", entry.getKey());
                        this.transformsWithoutErrorDataset.add(entry.getKey());
                    }
                }
                this.transformExecutor.resetEmitters();
            } catch (Exception e) {
                LOG.error("Exception thrown in BatchDriver Mapper: {}", (Throwable) e);
                Throwables.propagate(e);
            }
        }

        public void destroy() {
            Destroyables.destroyQuietly(this.transformExecutor);
            LOG.debug("Number of sinks to destroy: {}", Integer.valueOf(this.sinks.size()));
            for (WrappedSink<Object, Object, Object> wrappedSink : this.sinks) {
                LOG.trace("Destroying sink: {}", wrappedSink.sink);
                Destroyables.destroyQuietly(wrappedSink.sink);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/ETLMapReduce$ErrorSink.class */
    public static class ErrorSink<IN, KEY_OUT, VAL_OUT> {
        private final MapReduceTaskContext<KEY_OUT, VAL_OUT> context;
        private final String errorDatasetName;

        private ErrorSink(MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext, String str) {
            this.context = mapReduceTaskContext;
            this.errorDatasetName = str;
        }

        public void write(Collection<IN> collection) throws Exception {
            Iterator<IN> it = collection.iterator();
            while (it.hasNext()) {
                this.context.write(this.errorDatasetName, new AvroKey(ETLMapReduce.getGenericRecordForInvalidEntry((InvalidEntry) it.next())), NullWritable.get());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/ETLMapReduce$MultiOutputSink.class */
    public static class MultiOutputSink<IN, KEY_OUT, VAL_OUT> extends WrappedSink<IN, KEY_OUT, VAL_OUT> {
        private final Set<String> outputNames;
        private final String errorDatasetName;

        private MultiOutputSink(String str, BatchSink<IN, KEY_OUT, VAL_OUT> batchSink, MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext, Metrics metrics, Set<String> set, @Nullable String str2) {
            super(str, batchSink, mapReduceTaskContext, metrics);
            this.outputNames = set;
            this.errorDatasetName = str2;
        }

        @Override // co.cask.cdap.etl.batch.ETLMapReduce.WrappedSink
        public void write(IN in) throws Exception {
            this.sink.transform(in, this.emitter);
            Iterator<KeyValue<KEY_OUT, VAL_OUT>> it = this.emitter.iterator();
            while (it.hasNext()) {
                KeyValue<KEY_OUT, VAL_OUT> next = it.next();
                Iterator<String> it2 = this.outputNames.iterator();
                while (it2.hasNext()) {
                    this.context.write(it2.next(), next.getKey(), next.getValue());
                }
            }
            if (this.errorDatasetName != null && !this.emitter.getErrors().isEmpty()) {
                Iterator<InvalidEntry<KeyValue<KEY_OUT, VAL_OUT>>> it3 = this.emitter.getErrors().iterator();
                while (it3.hasNext()) {
                    this.context.write(this.errorDatasetName, new AvroKey(ETLMapReduce.getGenericRecordForInvalidEntry(it3.next())), NullWritable.get());
                }
            }
            this.emitter.reset();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/ETLMapReduce$SingleOutputSink.class */
    public static class SingleOutputSink<IN, KEY_OUT, VAL_OUT> extends WrappedSink<IN, KEY_OUT, VAL_OUT> {
        protected SingleOutputSink(String str, BatchSink<IN, KEY_OUT, VAL_OUT> batchSink, MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext, Metrics metrics) {
            super(str, batchSink, mapReduceTaskContext, metrics);
        }

        @Override // co.cask.cdap.etl.batch.ETLMapReduce.WrappedSink
        public void write(IN in) throws Exception {
            this.sink.transform(in, this.emitter);
            Iterator<KeyValue<KEY_OUT, VAL_OUT>> it = this.emitter.iterator();
            while (it.hasNext()) {
                KeyValue<KEY_OUT, VAL_OUT> next = it.next();
                this.context.write(next.getKey(), next.getValue());
            }
            this.emitter.reset();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/ETLMapReduce$SinkOutput.class */
    public static class SinkOutput {
        private String sinkPluginId;
        private Set<String> sinkOutputs;
        private String errorDatasetName;

        private SinkOutput(String str, Set<String> set, String str2) {
            this.sinkPluginId = str;
            this.sinkOutputs = set;
            this.errorDatasetName = str2;
        }

        public String getSinkPluginId() {
            return this.sinkPluginId;
        }

        public Set<String> getSinkOutputs() {
            return this.sinkOutputs;
        }

        public String getErrorDatasetName() {
            return this.errorDatasetName;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/etl/batch/ETLMapReduce$WrappedSink.class */
    public static abstract class WrappedSink<IN, KEY_OUT, VAL_OUT> {
        protected final BatchSink<IN, KEY_OUT, VAL_OUT> sink;
        protected final DefaultEmitter<KeyValue<KEY_OUT, VAL_OUT>> emitter;
        protected final MapReduceTaskContext<KEY_OUT, VAL_OUT> context;

        protected WrappedSink(String str, BatchSink<IN, KEY_OUT, VAL_OUT> batchSink, MapReduceTaskContext<KEY_OUT, VAL_OUT> mapReduceTaskContext, Metrics metrics) {
            this.sink = batchSink;
            this.emitter = new DefaultEmitter<>(new StageMetrics(metrics, PluginID.from(str)));
            this.context = mapReduceTaskContext;
        }

        protected abstract void write(IN in) throws Exception;
    }

    public ETLMapReduce(ETLBatchConfig eTLBatchConfig) {
        this.config = eTLBatchConfig;
    }

    public void configure() {
        setName(NAME);
        setDescription("MapReduce Driver for ETL Batch Applications");
        Pipeline registerPlugins = new PipelineRegisterer(getConfigurer(), "batch").registerPlugins(this.config, TimePartitionedFileSet.class, FileSetProperties.builder().setInputFormat(AvroKeyInputFormat.class).setOutputFormat(AvroKeyOutputFormat.class).setEnableExploreOnCreate(true).setSerDe("org.apache.hadoop.hive.serde2.avro.AvroSerDe").setExploreInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat").setExploreOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat").setTableProperty("avro.schema.literal", ERROR_SCHEMA.toString()).build(), true);
        if (this.config.getResources() != null) {
            setMapperResources(this.config.getResources());
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.Source.PLUGINID, registerPlugins.getSource());
        hashMap.put(Constants.Sink.PLUGINIDS, GSON.toJson(registerPlugins.getSinks()));
        hashMap.put(Constants.Transform.PLUGINIDS, GSON.toJson(registerPlugins.getTransforms()));
        setProperties(hashMap);
    }

    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        Job job = (Job) mapReduceContext.getHadoopJob();
        Map properties = mapReduceContext.getSpecification().getProperties();
        String str = (String) properties.get(Constants.Source.PLUGINID);
        this.batchSource = (BatchConfigurable) mapReduceContext.newPluginInstance(str);
        this.batchSource.prepareRun(new MapReduceSourceContext(mapReduceContext, this.mrMetrics, str));
        String str2 = (String) properties.get(Constants.Transform.PLUGINIDS);
        Preconditions.checkNotNull(str2, "Transform plugin ids not found in program properties.");
        for (TransformInfo transformInfo : (List) GSON.fromJson(str2, TRANSFORMINFO_LIST_TYPE)) {
            if (transformInfo.getErrorDatasetName() != null) {
                addPropertiesToErrorDataset(transformInfo.getErrorDatasetName(), mapReduceContext);
            }
        }
        ArrayList arrayList = new ArrayList();
        String str3 = (String) properties.get(Constants.Sink.PLUGINIDS);
        Preconditions.checkNotNull(str3, "Sink plugin ids could not be found in program properties.");
        List<SinkInfo> list = (List) GSON.fromJson(str3, SINK_INFO_TYPE);
        this.batchSinks = Lists.newArrayListWithCapacity(list.size());
        for (SinkInfo sinkInfo : list) {
            BatchConfigurable<BatchSinkContext> batchConfigurable = (BatchConfigurable) mapReduceContext.newPluginInstance(sinkInfo.getSinkId());
            MapReduceSinkContext mapReduceSinkContext = new MapReduceSinkContext(mapReduceContext, this.mrMetrics, sinkInfo.getSinkId());
            batchConfigurable.prepareRun(mapReduceSinkContext);
            this.batchSinks.add(batchConfigurable);
            arrayList.add(new SinkOutput(sinkInfo.getSinkId(), mapReduceSinkContext.getOutputNames(), sinkInfo.getErrorDatasetName()));
            if (sinkInfo.getErrorDatasetName() != null) {
                addPropertiesToErrorDataset(sinkInfo.getErrorDatasetName(), mapReduceContext);
            }
        }
        job.getConfiguration().set(SINK_OUTPUTS_KEY, GSON.toJson(arrayList));
        job.setMapperClass(ETLMapper.class);
        job.setNumReduceTasks(0);
    }

    private void addPropertiesToErrorDataset(String str, MapReduceContext mapReduceContext) {
        HashMap hashMap = new HashMap();
        hashMap.put("output.properties.avro.schema.output.key", ERROR_SCHEMA.toString());
        TimePartitionedFileSetArguments.setOutputPartitionTime(hashMap, mapReduceContext.getLogicalStartTime());
        mapReduceContext.addOutput(str, hashMap);
    }

    public void onFinish(boolean z, MapReduceContext mapReduceContext) throws Exception {
        onRunFinishSource(mapReduceContext, z);
        onRunFinishSink(mapReduceContext, z);
        LOG.info("Batch Run finished : succeeded = {}", Boolean.valueOf(z));
    }

    private void onRunFinishSource(MapReduceContext mapReduceContext, boolean z) {
        MapReduceSourceContext mapReduceSourceContext = new MapReduceSourceContext(mapReduceContext, this.mrMetrics, mapReduceContext.getSpecification().getProperty(Constants.Source.PLUGINID));
        LOG.info("On RunFinish Source : {}", this.batchSource.getClass().getName());
        try {
            this.batchSource.onRunFinish(z, mapReduceSourceContext);
        } catch (Throwable th) {
            LOG.warn("Exception when calling onRunFinish on {}", this.batchSource, th);
        }
    }

    private void onRunFinishSink(MapReduceContext mapReduceContext, boolean z) {
        String property = mapReduceContext.getSpecification().getProperty(Constants.Sink.PLUGINIDS);
        Preconditions.checkNotNull(property, "Sink plugin ids could not be found in program properties.");
        List list = (List) GSON.fromJson(property, SINK_INFO_TYPE);
        for (int i = 0; i < list.size(); i++) {
            BatchConfigurable<BatchSinkContext> batchConfigurable = this.batchSinks.get(i);
            try {
                batchConfigurable.onRunFinish(z, new MapReduceSinkContext(mapReduceContext, this.mrMetrics, ((SinkInfo) list.get(i)).getSinkId()));
            } catch (Throwable th) {
                LOG.warn("Exception when calling onRunFinish on {}", batchConfigurable, th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static GenericRecord getGenericRecordForInvalidEntry(InvalidEntry invalidEntry) {
        String format;
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(AVRO_ERROR_SCHEMA);
        genericRecordBuilder.set(Constants.ErrorDataset.ERRCODE, Integer.valueOf(invalidEntry.getErrorCode()));
        genericRecordBuilder.set(Constants.ErrorDataset.ERRMSG, invalidEntry.getErrorMsg());
        if (invalidEntry.getInvalidRecord() instanceof StructuredRecord) {
            try {
                format = StructuredRecordStringConverter.toJsonString((StructuredRecord) invalidEntry.getInvalidRecord());
            } catch (IOException e) {
                format = "Exception while converting StructuredRecord to String, " + e.getCause();
            }
        } else {
            format = String.format("Error Entry is of type %s, only a record of type %s is supported currently", invalidEntry.getInvalidRecord().getClass().getName(), StructuredRecord.class.getName());
        }
        genericRecordBuilder.set(Constants.ErrorDataset.INVALIDENTRY, format);
        return genericRecordBuilder.build();
    }
}
