package co.cask.cdap.etl.spec;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.plugin.PluginConfigurer;
import co.cask.cdap.etl.api.Engine;
import co.cask.cdap.etl.api.ErrorTransform;
import co.cask.cdap.etl.api.MultiInputPipelineConfigurable;
import co.cask.cdap.etl.api.MultiOutputPipelineConfigurable;
import co.cask.cdap.etl.api.PipelineConfigurable;
import co.cask.cdap.etl.api.SplitterTransform;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.action.Action;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.condition.Condition;
import co.cask.cdap.etl.common.ArtifactSelectorProvider;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultPipelineConfigurer;
import co.cask.cdap.etl.common.DefaultStageConfigurer;
import co.cask.cdap.etl.planner.Dag;
import co.cask.cdap.etl.proto.Connection;
import co.cask.cdap.etl.proto.v2.ETLConfig;
import co.cask.cdap.etl.proto.v2.ETLPlugin;
import co.cask.cdap.etl.proto.v2.ETLStage;
import co.cask.cdap.etl.spec.PipelineSpec;
import co.cask.cdap.etl.spec.StageSpec;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/* loaded from: input_file:lib/cdap-etl-core-4.3.1.jar:co/cask/cdap/etl/spec/PipelineSpecGenerator.class */
public abstract class PipelineSpecGenerator<C extends ETLConfig, P extends PipelineSpec> {
    private static final Set<String> VALID_ERROR_INPUTS = ImmutableSet.of(BatchSource.PLUGIN_TYPE, Transform.PLUGIN_TYPE, BatchAggregator.PLUGIN_TYPE, ErrorTransform.PLUGIN_TYPE);
    protected final PluginConfigurer configurer;
    protected final Engine engine;
    private final Class<? extends Dataset> errorDatasetClass;
    private final DatasetProperties errorDatasetProperties;
    private final Set<String> sourcePluginTypes;
    private final Set<String> sinkPluginTypes;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/cdap-etl-core-4.3.1.jar:co/cask/cdap/etl/spec/PipelineSpecGenerator$ConfiguredStage.class */
    public static class ConfiguredStage {
        private final StageSpec stageSpec;
        private final Map<String, String> pipelineProperties;

        private ConfiguredStage(StageSpec stageSpec, Map<String, String> map) {
            this.stageSpec = stageSpec;
            this.pipelineProperties = map;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PipelineSpecGenerator(PluginConfigurer pluginConfigurer, Set<String> set, Set<String> set2, Class<? extends Dataset> cls, DatasetProperties datasetProperties, Engine engine) {
        this.configurer = pluginConfigurer;
        this.sourcePluginTypes = set;
        this.sinkPluginTypes = set2;
        this.errorDatasetClass = cls;
        this.errorDatasetProperties = datasetProperties;
        this.engine = engine;
    }

    public abstract P generateSpec(C c);

    /* JADX INFO: Access modifiers changed from: protected */
    public void configureStages(ETLConfig eTLConfig, PipelineSpec.Builder builder) {
        Schema outputSchema;
        ValidatedPipeline validateConfig = validateConfig(eTLConfig);
        List<ETLStage> traversalOrder = validateConfig.getTraversalOrder();
        HashMap hashMap = new HashMap(traversalOrder.size());
        HashMap hashMap2 = new HashMap(traversalOrder.size());
        for (ETLStage eTLStage : traversalOrder) {
            String name = eTLStage.getName();
            hashMap2.put(name, eTLStage.getPlugin().getType());
            hashMap.put(name, new DefaultPipelineConfigurer(this.configurer, name, this.engine));
        }
        HashMap hashMap3 = new HashMap();
        String format = String.format("system.%s.", this.engine.name().toLowerCase());
        int length = format.length();
        for (Map.Entry<String, String> entry : eTLConfig.getProperties().entrySet()) {
            if (entry.getKey().startsWith(format)) {
                hashMap3.put(entry.getKey().substring(length), entry.getValue());
            }
        }
        HashBasedTable create = HashBasedTable.create();
        for (ETLStage eTLStage2 : traversalOrder) {
            String name2 = eTLStage2.getName();
            ConfiguredStage configureStage = configureStage(eTLStage2, validateConfig, (DefaultPipelineConfigurer) hashMap.get(name2));
            for (String str : validateConfig.getOutputs(name2)) {
                String str2 = (String) hashMap2.get(str);
                DefaultStageConfigurer stageConfigurer = ((DefaultPipelineConfigurer) hashMap.get(str)).getStageConfigurer();
                if (ErrorTransform.PLUGIN_TYPE.equals(str2)) {
                    outputSchema = configureStage.stageSpec.getErrorSchema();
                } else if (SplitterTransform.PLUGIN_TYPE.equals(configureStage.stageSpec.getPlugin().getType())) {
                    StageSpec.Port port = configureStage.stageSpec.getOutputPorts().get(str);
                    if (port == null) {
                        outputSchema = null;
                    } else {
                        if (port.getPort() == null) {
                            throw new IllegalArgumentException(String.format("Must specify a port when connecting Splitter '%s' to '%s'", name2, str));
                        }
                        outputSchema = port.getSchema();
                    }
                } else {
                    outputSchema = configureStage.stageSpec.getOutputSchema();
                }
                if (BatchJoiner.PLUGIN_TYPE.equals(str2) && outputSchema == null) {
                    throw new IllegalArgumentException(String.format("Joiner cannot have any null input schemas, but stage %s outputs a null schema.", name2));
                }
                if (!BatchJoiner.PLUGIN_TYPE.equals(str2) && !Action.PLUGIN_TYPE.equals(str2) && !Condition.PLUGIN_TYPE.equals(str2) && !hasSameSchema(stageConfigurer.getInputSchemas(), outputSchema)) {
                    throw new IllegalArgumentException("Two different input schema were set for the stage " + str);
                }
                stageConfigurer.addInputSchema(name2, outputSchema);
            }
            builder.addStage(configureStage.stageSpec);
            for (Map.Entry entry2 : configureStage.pipelineProperties.entrySet()) {
                create.put(entry2.getKey(), entry2.getValue(), name2);
            }
        }
        for (R r : create.rowKeySet()) {
            Map<C, V> row = create.row(r);
            if (row.size() > 1) {
                StringBuilder append = new StringBuilder("Pipeline property '").append(r).append("' is being set to different values by stages.");
                for (Map.Entry entry3 : row.entrySet()) {
                    append.append(" stage '").append((String) entry3.getValue()).append("' = '").append((String) entry3.getKey()).append("',");
                }
                append.deleteCharAt(append.length() - 1);
                throw new IllegalArgumentException(append.toString());
            }
            hashMap3.put(r, row.keySet().iterator().next());
        }
        builder.addConnections(eTLConfig.getConnections()).setResources(eTLConfig.getResources()).setDriverResources(eTLConfig.getDriverResources()).setClientResources(eTLConfig.getClientResources()).setStageLoggingEnabled(eTLConfig.isStageLoggingEnabled()).setNumOfRecordsPreview(eTLConfig.getNumOfRecordsPreview()).setProperties(hashMap3).build();
    }

    private boolean hasSameSchema(Map<String, Schema> map, Schema schema) {
        return map.isEmpty() || Objects.equals(map.values().iterator().next(), schema);
    }

    private ConfiguredStage configureStage(ETLStage eTLStage, ValidatedPipeline validatedPipeline, DefaultPipelineConfigurer defaultPipelineConfigurer) {
        String name = eTLStage.getName();
        ETLPlugin plugin = eTLStage.getPlugin();
        if (!Strings.isNullOrEmpty(eTLStage.getErrorDatasetName())) {
            this.configurer.createDataset(eTLStage.getErrorDatasetName(), this.errorDatasetClass, this.errorDatasetProperties);
        }
        PluginSpec configurePlugin = configurePlugin(name, plugin, defaultPipelineConfigurer);
        DefaultStageConfigurer stageConfigurer = defaultPipelineConfigurer.getStageConfigurer();
        HashMap hashMap = new HashMap();
        Map<String, Schema> inputSchemas = stageConfigurer.getInputSchemas();
        if (configurePlugin.getType().equals(SplitterTransform.PLUGIN_TYPE)) {
            Map<String, Schema> outputPortSchemas = stageConfigurer.getOutputPortSchemas();
            for (Map.Entry<String, String> entry : validatedPipeline.getOutputPorts(name).entrySet()) {
                String key = entry.getKey();
                String value = entry.getValue();
                if (value == null) {
                    throw new IllegalArgumentException(String.format("Connection from Splitter '%s' to '%s' must specify a port.", name, key));
                }
                hashMap.put(key, new StageSpec.Port(value, outputPortSchemas.get(value)));
            }
        } else {
            Schema outputSchema = stageConfigurer.getOutputSchema();
            if (Condition.PLUGIN_TYPE.equals(configurePlugin.getType())) {
                outputSchema = null;
                for (Schema schema : inputSchemas.values()) {
                    if (schema != null) {
                        if (outputSchema != null && !outputSchema.equals(schema)) {
                            throw new IllegalArgumentException("Cannot have different input schemas going into stage " + name);
                        }
                        outputSchema = schema;
                    }
                }
            }
            Iterator<String> it = validatedPipeline.getOutputs(name).iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), new StageSpec.Port(null, outputSchema));
            }
        }
        return new ConfiguredStage(StageSpec.builder(name, configurePlugin).setErrorDatasetName(eTLStage.getErrorDatasetName()).addInputSchemas(inputSchemas).addOutputPortSchemas(hashMap).setErrorSchema(stageConfigurer.getErrorSchema()).setProcessTimingEnabled(validatedPipeline.isProcessTimingEnabled()).setStageLoggingEnabled(validatedPipeline.isStageLoggingEnabled()).build(), defaultPipelineConfigurer.getPipelineProperties());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PluginSpec configurePlugin(String str, ETLPlugin eTLPlugin, DefaultPipelineConfigurer defaultPipelineConfigurer) {
        TrackedPluginSelector trackedPluginSelector = new TrackedPluginSelector(new ArtifactSelectorProvider(eTLPlugin.getType(), eTLPlugin.getName()).getPluginSelector(eTLPlugin.getArtifactConfig()));
        String type = eTLPlugin.getType();
        Object usePlugin = this.configurer.usePlugin(eTLPlugin.getType(), eTLPlugin.getName(), str, eTLPlugin.getPluginProperties(), trackedPluginSelector);
        if (usePlugin == null) {
            throw new IllegalArgumentException(String.format("No plugin of type %s and name %s could be found for stage %s.", eTLPlugin.getType(), eTLPlugin.getName(), str));
        }
        try {
            if (type.equals(BatchJoiner.PLUGIN_TYPE)) {
                ((MultiInputPipelineConfigurable) usePlugin).configurePipeline(defaultPipelineConfigurer);
            } else if (type.equals(SplitterTransform.PLUGIN_TYPE)) {
                ((MultiOutputPipelineConfigurable) usePlugin).configurePipeline(defaultPipelineConfigurer);
            } else if (!type.equals(Constants.SPARK_PROGRAM_PLUGIN_TYPE)) {
                ((PipelineConfigurable) usePlugin).configurePipeline(defaultPipelineConfigurer);
            }
            return new PluginSpec(eTLPlugin.getType(), eTLPlugin.getName(), eTLPlugin.getProperties(), trackedPluginSelector.getSelectedArtifact());
        } catch (Exception e) {
            throw new RuntimeException(String.format("Exception while configuring plugin of type %s and name %s for stage %s: %s", eTLPlugin.getType(), eTLPlugin.getName(), str, e.getMessage()), e);
        }
    }

    private ValidatedPipeline validateConfig(ETLConfig eTLConfig) {
        eTLConfig.validate();
        if (eTLConfig.getStages().isEmpty()) {
            throw new IllegalArgumentException("A pipeline must contain at least one stage.");
        }
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashMap hashMap = new HashMap();
        HashSet hashSet3 = new HashSet();
        for (ETLStage eTLStage : eTLConfig.getStages()) {
            if (!hashSet3.add(eTLStage.getName())) {
                throw new IllegalArgumentException(String.format("Invalid pipeline. Multiple stages are named %s. Please ensure all stage names are unique", eTLStage.getName()));
            }
            if (isAction(eTLStage.getPlugin().getType())) {
                hashSet.add(eTLStage.getName());
            }
            if (eTLStage.getPlugin().getType().equals(Condition.PLUGIN_TYPE)) {
                hashSet2.add(eTLStage.getName());
            }
            hashMap.put(eTLStage.getName(), eTLStage.getPlugin().getType());
        }
        HashMap hashMap2 = new HashMap();
        for (Connection connection : eTLConfig.getConnections()) {
            if (!hashSet3.contains(connection.getFrom())) {
                throw new IllegalArgumentException(String.format("Invalid connection %s. %s is not a stage.", connection, connection.getFrom()));
            }
            if (!hashSet3.contains(connection.getTo())) {
                throw new IllegalArgumentException(String.format("Invalid connection %s. %s is not a stage.", connection, connection.getTo()));
            }
            if (hashSet2.contains(connection.getFrom())) {
                if (connection.getCondition() == null) {
                    throw new IllegalArgumentException(String.format("For condition stage %s, the connection %s is not marked with either 'true' or 'false'.", connection.getFrom(), connection));
                }
                if (hashMap2.containsKey(connection.getFrom()) && connection.getCondition().equals(hashMap2.get(connection.getFrom()))) {
                    throw new IllegalArgumentException(String.format("For condition stage '%s', more than one outgoing connections are marked as %s.", connection.getFrom(), connection.getCondition()));
                }
                hashMap2.put(connection.getFrom(), connection.getCondition());
            }
        }
        ArrayList arrayList = new ArrayList(hashSet3.size());
        if (eTLConfig.getConnections().isEmpty()) {
            if (hashSet.size() != 1 || hashSet3.size() != 1) {
                throw new IllegalArgumentException("Invalid pipeline. There are no connections between stages. This is only allowed if the pipeline consists of a single action plugin.");
            }
            arrayList.add(eTLConfig.getStages().iterator().next());
            return new ValidatedPipeline(arrayList, eTLConfig);
        }
        Dag dag = new Dag(eTLConfig.getConnections());
        Sets.SetView union = Sets.union(hashSet, hashSet2);
        HashMap hashMap3 = new HashMap();
        for (ETLStage eTLStage2 : eTLConfig.getStages()) {
            String name = eTLStage2.getName();
            Set<String> nodeInputs = dag.getNodeInputs(name);
            Set<String> nodeOutputs = dag.getNodeOutputs(name);
            String type = eTLStage2.getPlugin().getType();
            boolean isSource = isSource(type);
            boolean isSink = isSink(type);
            if (isSource) {
                if (!nodeInputs.isEmpty() && !union.containsAll(nodeInputs)) {
                    throw new IllegalArgumentException(String.format("%s %s has incoming connections from %s. %s stages cannot have any incoming connections.", type, name, type, Joiner.on(',').join((Iterable<?>) nodeInputs)));
                }
            } else if (isSink) {
                if (!nodeOutputs.isEmpty() && !union.containsAll(nodeOutputs)) {
                    throw new IllegalArgumentException(String.format("%s %s has outgoing connections to %s. %s stages cannot have any outgoing connections.", type, name, type, Joiner.on(',').join((Iterable<?>) nodeOutputs)));
                }
            } else if (ErrorTransform.PLUGIN_TYPE.equals(type)) {
                for (String str : nodeInputs) {
                    String str2 = (String) hashMap.get(str);
                    if (!VALID_ERROR_INPUTS.contains(str2)) {
                        throw new IllegalArgumentException(String.format("ErrorTransform %s cannot have stage %s of type %s as input. Only %s stages can emit errors.", name, str, str2, Joiner.on(',').join((Iterable<?>) VALID_ERROR_INPUTS)));
                    }
                }
            }
            boolean isAction = isAction(type);
            if (!isAction && !type.equals(Condition.PLUGIN_TYPE) && !isSource && nodeInputs.isEmpty()) {
                throw new IllegalArgumentException(String.format("Stage %s is unreachable, it has no incoming connections.", name));
            }
            if (!isAction && !isSink && nodeOutputs.isEmpty()) {
                throw new IllegalArgumentException(String.format("Stage %s is a dead end, it has no outgoing connections.", name));
            }
            hashMap3.put(name, eTLStage2);
        }
        validateConditionBranches(hashSet2, dag);
        Iterator<String> it = dag.getTopologicalOrder().iterator();
        while (it.hasNext()) {
            arrayList.add(hashMap3.get(it.next()));
        }
        return new ValidatedPipeline(arrayList, eTLConfig);
    }

    private boolean isAction(String str) {
        return Action.PLUGIN_TYPE.equals(str) || Constants.SPARK_PROGRAM_PLUGIN_TYPE.equals(str);
    }

    private boolean isSource(String str) {
        return this.sourcePluginTypes.contains(str);
    }

    private boolean isSink(String str) {
        return this.sinkPluginTypes.contains(str);
    }

    private void validateConditionBranches(Set<String> set, Dag dag) {
        for (String str : set) {
            Set<String> nodeOutputs = dag.getNodeOutputs(str);
            if (nodeOutputs == null || nodeOutputs.size() > 2) {
                Object[] objArr = new Object[2];
                objArr[0] = str;
                objArr[1] = Integer.valueOf(nodeOutputs == null ? 0 : nodeOutputs.size());
                throw new IllegalArgumentException(String.format("Condition stage in the pipeline '%s' should have at least 1 and at max 2 outgoing connections corresponding to 'true' and 'false' branches but found '%s'.", objArr));
            }
            Iterator<String> it = nodeOutputs.iterator();
            while (it.hasNext()) {
                validateSingleInput(str, it.next(), dag);
            }
        }
    }

    private void validateSingleInput(String str, String str2, Dag dag) {
        if (dag.getNodeInputs(str2).size() > 1) {
            HashSet hashSet = new HashSet(dag.getSources());
            hashSet.add(str);
            Set<String> parentsOf = dag.parentsOf(str2, hashSet);
            parentsOf.retainAll(dag.getSources());
            if (parentsOf.size() > 0) {
                String str3 = "";
                for (String str4 : parentsOf) {
                    if (!str3.isEmpty()) {
                        str3 = str3 + ", ";
                    }
                    str3 = str3 + str4 + "->" + str2;
                }
                throw new IllegalArgumentException(String.format("Stage in the pipeline '%s' is on the branch of condition '%s'. However it also has following incoming paths: '%s', which is not supported.", str2, str, str3));
            }
        }
        Set<String> nodeOutputs = dag.getNodeOutputs(str2);
        if (nodeOutputs.size() == 0) {
            return;
        }
        Iterator<String> it = nodeOutputs.iterator();
        while (it.hasNext()) {
            validateSingleInput(str, it.next(), dag);
        }
    }
}
