package co.cask.cdap.etl.batch.mapreduce;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.annotation.TransactionControl;
import co.cask.cdap.api.annotation.TransactionPolicy;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.etl.api.AlertPublisher;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchConfigurable;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.lineage.field.FieldOperation;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.batch.DefaultAggregatorContext;
import co.cask.cdap.etl.batch.DefaultJoinerContext;
import co.cask.cdap.etl.batch.PipelinePluginInstantiator;
import co.cask.cdap.etl.batch.StageFailureException;
import co.cask.cdap.etl.batch.connector.MultiConnectorFactory;
import co.cask.cdap.etl.batch.conversion.WritableConversion;
import co.cask.cdap.etl.batch.conversion.WritableConversions;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.FieldOperationTypeAdapter;
import co.cask.cdap.etl.common.LocationAwareMDCWrapperLogger;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.common.PipelineRuntime;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.common.TypeChecker;
import co.cask.cdap.etl.common.submit.AggregatorContextProvider;
import co.cask.cdap.etl.common.submit.CompositeFinisher;
import co.cask.cdap.etl.common.submit.ContextProvider;
import co.cask.cdap.etl.common.submit.Finisher;
import co.cask.cdap.etl.common.submit.JoinerContextProvider;
import co.cask.cdap.etl.common.submit.SubmitterPlugin;
import co.cask.cdap.etl.log.LogStageInjector;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.SetMultimap;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/cdap-etl-batch-5.1.1.jar:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce.class */
public class ETLMapReduce extends AbstractMapReduce {
    public static final String MAP_KEY_CLASS = "cdap.etl.map.key.class";
    public static final String MAP_VAL_CLASS = "cdap.etl.map.val.class";
    static final String RUNTIME_ARGS_KEY = "cdap.etl.runtime.args";
    static final String INPUT_ALIAS_KEY = "cdap.etl.source.alias.key";
    static final String SINK_OUTPUTS_KEY = "cdap.etl.sink.outputs";
    private Finisher finisher;
    private Metrics mrMetrics;
    private final BatchPhaseSpec phaseSpec;
    private final Set<String> connectorDatasets;
    public static final String NAME = ETLMapReduce.class.getSimpleName();
    static final Type RUNTIME_ARGS_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.1
    }.getType();
    static final Type INPUT_ALIAS_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.2
    }.getType();
    static final Type SINK_OUTPUTS_TYPE = new TypeToken<Map<String, SinkOutput>>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.3
    }.getType();
    static final Type CONNECTOR_DATASETS_TYPE = new TypeToken<HashSet<String>>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.4
    }.getType();
    private static final Logger LOG = LoggerFactory.getLogger(ETLMapReduce.class);
    private static final Logger PIPELINE_LOG = new LocationAwareMDCWrapperLogger(LOG, Constants.EVENT_TYPE_TAG, Constants.PIPELINE_LIFECYCLE_TAG_VALUE);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).registerTypeAdapter(FieldOperation.class, new FieldOperationTypeAdapter()).create();

    /* loaded from: input_file:lib/cdap-etl-batch-5.1.1.jar:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$ETLMapper.class */
    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
        private TransformRunner<Object, Object> transformRunner;
        private Metrics mapperMetrics;

        public void initialize(MapReduceTaskContext<Object, Object> mapReduceTaskContext) throws Exception {
            if (Boolean.valueOf((String) mapReduceTaskContext.getSpecification().getProperties().get(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
                LogStageInjector.start();
            }
            this.transformRunner = new TransformRunner<>(mapReduceTaskContext, this.mapperMetrics);
        }

        public void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            try {
                this.transformRunner.transform(obj, obj2);
            } catch (StageFailureException e) {
                ETLMapReduce.PIPELINE_LOG.error("{}", e.getMessage(), e.getCause());
                Throwables.propagate(e.getCause());
            } catch (Exception e2) {
                Throwables.propagate(e2);
            }
        }

        public void destroy() {
            this.transformRunner.destroy();
        }
    }

    /* loaded from: input_file:lib/cdap-etl-batch-5.1.1.jar:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$ETLReducer.class */
    public static class ETLReducer extends Reducer implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
        private Metrics reducerMetrics;
        private TransformRunner<Object, Iterator> transformRunner;

        public void initialize(MapReduceTaskContext<Object, Object> mapReduceTaskContext) throws Exception {
            if (Boolean.valueOf((String) mapReduceTaskContext.getSpecification().getProperties().get(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
                LogStageInjector.start();
            }
            this.transformRunner = new TransformRunner<>(mapReduceTaskContext, this.reducerMetrics);
        }

        protected void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            try {
                this.transformRunner.transform(obj, iterable.iterator());
            } catch (StageFailureException e) {
                ETLMapReduce.PIPELINE_LOG.error("{}", e.getMessage(), e.getCause());
                Throwables.propagate(e.getCause());
            } catch (Exception e2) {
                Throwables.propagate(e2);
            }
        }

        public void destroy() {
            this.transformRunner.destroy();
        }
    }

    /* loaded from: input_file:lib/cdap-etl-batch-5.1.1.jar:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$MapReduceBatchContextProvider.class */
    private static class MapReduceBatchContextProvider implements ContextProvider<MapReduceBatchContext> {
        private final MapReduceContext context;
        private final PipelineRuntime pipelineRuntime;
        private final StageSpec stageSpec;
        private final Set<String> connectorDatasets;

        private MapReduceBatchContextProvider(MapReduceContext mapReduceContext, PipelineRuntime pipelineRuntime, StageSpec stageSpec, Set<String> set) {
            this.context = mapReduceContext;
            this.pipelineRuntime = pipelineRuntime;
            this.stageSpec = stageSpec;
            this.connectorDatasets = set;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // co.cask.cdap.etl.common.submit.ContextProvider
        public MapReduceBatchContext getContext(DatasetContext datasetContext) {
            return new MapReduceBatchContext(this.context, this.pipelineRuntime, this.stageSpec, this.connectorDatasets, datasetContext);
        }
    }

    public ETLMapReduce(BatchPhaseSpec batchPhaseSpec) {
        this(batchPhaseSpec, new HashSet());
    }

    public ETLMapReduce(BatchPhaseSpec batchPhaseSpec, Set<String> set) {
        this.phaseSpec = batchPhaseSpec;
        this.connectorDatasets = set;
    }

    public void configure() {
        setName(this.phaseSpec.getPhaseName());
        setDescription("MapReduce phase executor. " + this.phaseSpec.getDescription());
        this.phaseSpec.getPhase().registerPlugins(getConfigurer());
        setMapperResources(this.phaseSpec.getResources());
        setReducerResources(this.phaseSpec.getResources());
        setDriverResources(this.phaseSpec.getDriverResources());
        if (this.phaseSpec.getPhase().getSources().isEmpty()) {
            throw new IllegalArgumentException(String.format("Pipeline phase '%s' must contain at least one source but it has no sources.", this.phaseSpec.getPhaseName()));
        }
        if (this.phaseSpec.getPhase().getSinks().isEmpty()) {
            throw new IllegalArgumentException(String.format("Pipeline phase '%s' must contain at least one sink but does not have any.", this.phaseSpec.getPhaseName()));
        }
        Set<StageSpec> stagesOfType = this.phaseSpec.getPhase().getStagesOfType(BatchAggregator.PLUGIN_TYPE, BatchJoiner.PLUGIN_TYPE);
        if (stagesOfType.size() > 1) {
            throw new IllegalArgumentException(String.format("Pipeline phase '%s' cannot contain more than one reducer but it has reducers '%s'.", this.phaseSpec.getPhaseName(), Joiner.on(',').join((Iterable<?>) stagesOfType)));
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(this.phaseSpec));
        hashMap.put(Constants.CONNECTOR_DATASETS, GSON.toJson(this.connectorDatasets));
        setProperties(hashMap);
    }

    @TransactionPolicy(TransactionControl.EXPLICIT)
    public void initialize() throws Exception {
        MapReduceContext context = getContext();
        Map properties = context.getSpecification().getProperties();
        if (Boolean.valueOf((String) properties.get(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
            LogStageInjector.start();
        }
        PipelineRuntime pipelineRuntime = new PipelineRuntime(context, this.mrMetrics);
        ArrayList arrayList = new ArrayList();
        final Job job = (Job) context.getHadoopJob();
        final Configuration configuration = job.getConfiguration();
        configuration.setBoolean("mapreduce.map.speculative", false);
        configuration.setBoolean("mapreduce.reduce.speculative", false);
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(pipelineRuntime.getArguments(), context.getLogicalStartTime(), context, context.getNamespace());
        BatchPhaseSpec batchPhaseSpec = (BatchPhaseSpec) GSON.fromJson((String) properties.get(Constants.PIPELINEID), BatchPhaseSpec.class);
        Set set = (Set) GSON.fromJson((String) properties.get(Constants.CONNECTOR_DATASETS), CONNECTOR_DATASETS_TYPE);
        for (Map.Entry<String, String> entry : batchPhaseSpec.getPipelineProperties().entrySet()) {
            configuration.set(entry.getKey(), entry.getValue());
        }
        PipelinePhase phase = batchPhaseSpec.getPhase();
        PipelinePluginInstantiator pipelinePluginInstantiator = new PipelinePluginInstantiator(context, this.mrMetrics, batchPhaseSpec, new MultiConnectorFactory());
        Set<StageSpec> stagesOfType = batchPhaseSpec.getPhase().getStagesOfType(BatchAggregator.PLUGIN_TYPE, BatchJoiner.PLUGIN_TYPE);
        if (stagesOfType.size() > 1) {
            Iterator<StageSpec> it = stagesOfType.iterator();
            StringBuilder sb = new StringBuilder(it.next().getName());
            while (it.hasNext()) {
                sb.append(",");
                sb.append(it.next().getName());
            }
            throw new IllegalStateException("Found multiple reducers ( " + ((Object) sb) + " ) in the same pipeline phase. This means there was a bug in planning the pipeline when it was deployed. ");
        }
        job.setMapperClass(ETLMapper.class);
        if (stagesOfType.isEmpty()) {
            job.setNumReduceTasks(0);
        } else {
            job.setReducerClass(ETLReducer.class);
        }
        final HashMap hashMap = new HashMap();
        final HashMap hashMap2 = new HashMap();
        final HashMap hashMap3 = new HashMap();
        for (final String str : phase.getDag().getTopologicalOrder()) {
            StageSpec stage = phase.getStage(str);
            String pluginType = stage.getPluginType();
            boolean z = Constants.Connector.PLUGIN_TYPE.equals(pluginType) && phase.getSources().contains(str);
            boolean z2 = Constants.Connector.PLUGIN_TYPE.equals(pluginType) && phase.getSinks().contains(str);
            SubmitterPlugin submitterPlugin = null;
            if (BatchSource.PLUGIN_TYPE.equals(pluginType) || z) {
                submitterPlugin = new SubmitterPlugin(str, context, (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new MapReduceBatchContextProvider(context, pipelineRuntime, stage, set), new SubmitterPlugin.PrepareAction<MapReduceBatchContext>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.5
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(MapReduceBatchContext mapReduceBatchContext) {
                        Iterator<String> it2 = mapReduceBatchContext.getInputNames().iterator();
                        while (it2.hasNext()) {
                            hashMap2.put(it2.next(), str);
                        }
                        hashMap3.put(str, mapReduceBatchContext.getFieldOperations());
                    }
                });
            } else if (BatchSink.PLUGIN_TYPE.equals(pluginType) || AlertPublisher.PLUGIN_TYPE.equals(pluginType) || z2) {
                submitterPlugin = new SubmitterPlugin(str, context, (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new MapReduceBatchContextProvider(context, pipelineRuntime, stage, set), new SubmitterPlugin.PrepareAction<MapReduceBatchContext>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.6
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(MapReduceBatchContext mapReduceBatchContext) {
                        hashMap.put(str, new SinkOutput(mapReduceBatchContext.getOutputNames()));
                        hashMap3.put(str, mapReduceBatchContext.getFieldOperations());
                    }
                });
            } else if (Transform.PLUGIN_TYPE.equals(pluginType)) {
                submitterPlugin = new SubmitterPlugin(str, context, (Transform) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new MapReduceBatchContextProvider(context, pipelineRuntime, stage, set), new SubmitterPlugin.PrepareAction<MapReduceBatchContext>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.7
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(MapReduceBatchContext mapReduceBatchContext) {
                        hashMap3.put(str, mapReduceBatchContext.getFieldOperations());
                    }
                });
            } else if (BatchAggregator.PLUGIN_TYPE.equals(pluginType)) {
                final BatchAggregator batchAggregator = (BatchAggregator) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator);
                submitterPlugin = new SubmitterPlugin(str, context, batchAggregator, new AggregatorContextProvider(pipelineRuntime, stage, context.getAdmin()), new SubmitterPlugin.PrepareAction<DefaultAggregatorContext>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.8
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(DefaultAggregatorContext defaultAggregatorContext) {
                        if (defaultAggregatorContext.getNumPartitions() != null) {
                            job.setNumReduceTasks(defaultAggregatorContext.getNumPartitions().intValue());
                        }
                        Class<?> groupKeyClass = defaultAggregatorContext.getGroupKeyClass();
                        Class<?> groupValueClass = defaultAggregatorContext.getGroupValueClass();
                        if (groupKeyClass == null) {
                            groupKeyClass = TypeChecker.getGroupKeyClass(batchAggregator);
                        }
                        if (groupValueClass == null) {
                            groupValueClass = TypeChecker.getGroupValueClass(batchAggregator);
                        }
                        configuration.set(ETLMapReduce.MAP_KEY_CLASS, groupKeyClass.getName());
                        configuration.set(ETLMapReduce.MAP_VAL_CLASS, groupValueClass.getName());
                        job.setMapOutputKeyClass(ETLMapReduce.this.getOutputKeyClass(str, groupKeyClass));
                        job.setMapOutputValueClass(ETLMapReduce.this.getOutputValClass(str, groupValueClass));
                        hashMap3.put(str, defaultAggregatorContext.getFieldOperations());
                    }
                });
            } else if (BatchJoiner.PLUGIN_TYPE.equals(pluginType)) {
                final BatchJoiner batchJoiner = (BatchJoiner) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator);
                submitterPlugin = new SubmitterPlugin(str, context, batchJoiner, new JoinerContextProvider(pipelineRuntime, stage, context.getAdmin()), new SubmitterPlugin.PrepareAction<DefaultJoinerContext>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.9
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(DefaultJoinerContext defaultJoinerContext) {
                        if (defaultJoinerContext.getNumPartitions() != null) {
                            job.setNumReduceTasks(defaultJoinerContext.getNumPartitions().intValue());
                        }
                        Class<?> joinKeyClass = defaultJoinerContext.getJoinKeyClass();
                        Class<?> joinInputRecordClass = defaultJoinerContext.getJoinInputRecordClass();
                        if (joinKeyClass == null) {
                            joinKeyClass = TypeChecker.getJoinKeyClass(batchJoiner);
                        }
                        if (joinInputRecordClass == null) {
                            joinInputRecordClass = TypeChecker.getJoinInputRecordClass(batchJoiner);
                        }
                        configuration.set(ETLMapReduce.MAP_KEY_CLASS, joinKeyClass.getName());
                        configuration.set(ETLMapReduce.MAP_VAL_CLASS, joinInputRecordClass.getName());
                        job.setMapOutputKeyClass(ETLMapReduce.this.getOutputKeyClass(str, joinKeyClass));
                        ETLMapReduce.this.getOutputValClass(str, joinInputRecordClass);
                        job.setMapOutputValueClass(TaggedWritable.class);
                        hashMap3.put(str, defaultJoinerContext.getFieldOperations());
                    }
                });
            }
            if (submitterPlugin != null) {
                submitterPlugin.prepareRun();
                arrayList.add(submitterPlugin);
            }
        }
        configuration.set(SINK_OUTPUTS_KEY, GSON.toJson(hashMap));
        configuration.set(INPUT_ALIAS_KEY, GSON.toJson(hashMap2));
        this.finisher = new CompositeFinisher(arrayList);
        job.setMapperClass(ETLMapper.class);
        WorkflowToken workflowToken = context.getWorkflowToken();
        if (workflowToken != null) {
            for (Map.Entry<String, String> entry2 : pipelineRuntime.getArguments().getAddedArguments().entrySet()) {
                workflowToken.put(entry2.getKey(), entry2.getValue());
            }
            workflowToken.put(Constants.FIELD_OPERATION_KEY_IN_WORKFLOW_TOKEN, GSON.toJson(hashMap3));
        }
        configuration.set(RUNTIME_ARGS_KEY, GSON.toJson(pipelineRuntime.getArguments().asMap()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Class<?> getOutputKeyClass(String str, Class<?> cls) {
        WritableConversion conversion = WritableConversions.getConversion(cls.getName());
        if (conversion != null) {
            cls = conversion.getWritableClass();
        }
        if (WritableComparable.class.isAssignableFrom(cls)) {
            return cls;
        }
        throw new IllegalArgumentException(String.format("Invalid reducer %s. The key class %s must implement Hadoop's WritableComparable.", str, cls));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Class<?> getOutputValClass(String str, Class<?> cls) {
        WritableConversion conversion = WritableConversions.getConversion(cls.getName());
        if (conversion != null) {
            cls = conversion.getWritableClass();
        }
        if (Writable.class.isAssignableFrom(cls)) {
            return cls;
        }
        throw new IllegalArgumentException(String.format("Invalid reducer %s. The value class %s must implement Hadoop's Writable.", str, cls));
    }

    @TransactionPolicy(TransactionControl.EXPLICIT)
    public void destroy() {
        boolean z = getContext().getState().getStatus() == ProgramStatus.COMPLETED;
        if (this.finisher != null) {
            this.finisher.onFinish(z);
        }
        LOG.info("Batch Run finished : status = {}", getContext().getState());
    }
}
