package co.cask.cdap.examples.datacleansing;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.BatchPartitionConsumer;
import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionedFileSetArguments;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.metrics.Metrics;
import java.io.IOException;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

/* loaded from: input_file:co/cask/cdap/examples/datacleansing/DataCleansingMapReduce.class */
public class DataCleansingMapReduce extends AbstractMapReduce {
    protected static final String NAME = "DataCleansingMapReduce";
    protected static final String OUTPUT_PARTITION_KEY = "output.partition.key";
    protected static final String SCHEMA_KEY = "schema.key";
    private final BatchPartitionConsumer partitionConsumer = new BatchPartitionConsumer() { // from class: co.cask.cdap.examples.datacleansing.DataCleansingMapReduce.1
        private static final String STATE_KEY = "state.key";

        @Nullable
        protected byte[] readBytes(DatasetContext datasetContext) {
            return datasetContext.getDataset("consumingState").read(STATE_KEY);
        }

        protected void writeBytes(DatasetContext datasetContext, byte[] bArr) {
            datasetContext.getDataset("consumingState").write(STATE_KEY, bArr);
        }
    };

    /* loaded from: input_file:co/cask/cdap/examples/datacleansing/DataCleansingMapReduce$SchemaMatchingFilter.class */
    public static class SchemaMatchingFilter extends Mapper<LongWritable, Text, NullWritable, Text> {
        public static final Schema DEFAULT_SCHEMA = Schema.recordOf("person", new Schema.Field[]{Schema.Field.of("pid", Schema.of(Schema.Type.LONG)), Schema.Field.of("name", Schema.of(Schema.Type.STRING)), Schema.Field.of("dob", Schema.of(Schema.Type.STRING)), Schema.Field.of("zip", Schema.of(Schema.Type.INT))});
        private SimpleSchemaMatcher schemaMatcher;
        private Metrics mapMetrics;

        protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            String str = context.getConfiguration().get(DataCleansingMapReduce.SCHEMA_KEY);
            if (str == null) {
                this.schemaMatcher = new SimpleSchemaMatcher(DEFAULT_SCHEMA);
            } else {
                this.schemaMatcher = new SimpleSchemaMatcher(Schema.parseJson(str));
            }
        }

        public void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            if (this.schemaMatcher.matches(text.toString())) {
                context.write(NullWritable.get(), text);
            } else {
                this.mapMetrics.count("data.invalid", 1);
            }
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, NullWritable, Text>.Context) context);
        }
    }

    public void configure() {
        setName(NAME);
        setMapperResources(new Resources(1024));
        setReducerResources(new Resources(1024));
    }

    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        mapReduceContext.setInput("rawRecords", this.partitionConsumer.getConfiguredDataset(mapReduceContext, "rawRecords"));
        PartitionKey build = PartitionKey.builder().addLongField("time", Long.valueOf((String) mapReduceContext.getRuntimeArguments().get(OUTPUT_PARTITION_KEY)).longValue()).build();
        HashMap hashMap = new HashMap();
        PartitionedFileSetArguments.setOutputPartitionKey(hashMap, build);
        mapReduceContext.setOutput("cleanRecords", mapReduceContext.getDataset("cleanRecords", hashMap));
        Job job = (Job) mapReduceContext.getHadoopJob();
        job.setMapperClass(SchemaMatchingFilter.class);
        String str = (String) mapReduceContext.getRuntimeArguments().get(SCHEMA_KEY);
        if (str != null) {
            job.getConfiguration().set(SCHEMA_KEY, str);
        }
    }

    public void onFinish(boolean z, MapReduceContext mapReduceContext) throws Exception {
        if (z) {
            this.partitionConsumer.persist(mapReduceContext);
        }
    }
}
