package co.cask.cdap.etl.spark.function;

import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.Transformation;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultEmitter;
import co.cask.cdap.etl.common.NoErrorEmitter;
import co.cask.cdap.etl.common.TrackedTransform;
import java.util.Iterator;
import scala.Tuple2;

/* loaded from: input_file:lib/hydrator-spark-core-4.3.4.jar:co/cask/cdap/etl/spark/function/AggregatorGroupByFunction.class */
public class AggregatorGroupByFunction<GROUP_KEY, GROUP_VAL> implements PairFlatMapFunc<GROUP_VAL, GROUP_KEY, GROUP_VAL> {
    private final PluginFunctionContext pluginFunctionContext;
    private transient TrackedTransform<GROUP_VAL, Tuple2<GROUP_KEY, GROUP_VAL>> groupByFunction;
    private transient DefaultEmitter<Tuple2<GROUP_KEY, GROUP_VAL>> emitter;

    /* loaded from: input_file:lib/hydrator-spark-core-4.3.4.jar:co/cask/cdap/etl/spark/function/AggregatorGroupByFunction$GroupByTransform.class */
    private static class GroupByTransform<GROUP_KEY, GROUP_VAL> implements Transformation<GROUP_VAL, Tuple2<GROUP_KEY, GROUP_VAL>> {
        private final BatchAggregator<GROUP_KEY, GROUP_VAL, ?> aggregator;
        private final NoErrorEmitter<GROUP_KEY> keyEmitter = new NoErrorEmitter<>("Errors and Alerts cannot be emitted from the groupBy method of an aggregator");

        GroupByTransform(BatchAggregator<GROUP_KEY, GROUP_VAL, ?> batchAggregator) {
            this.aggregator = batchAggregator;
        }

        @Override // co.cask.cdap.etl.api.Transformation
        public void transform(GROUP_VAL group_val, Emitter<Tuple2<GROUP_KEY, GROUP_VAL>> emitter) throws Exception {
            this.keyEmitter.reset();
            this.aggregator.groupBy(group_val, this.keyEmitter);
            Iterator<GROUP_KEY> it = this.keyEmitter.getEntries().iterator();
            while (it.hasNext()) {
                emitter.emit(new Tuple2<>(it.next(), group_val));
            }
        }
    }

    public AggregatorGroupByFunction(PluginFunctionContext pluginFunctionContext) {
        this.pluginFunctionContext = pluginFunctionContext;
    }

    @Override // co.cask.cdap.etl.spark.function.PairFlatMapFunc
    public Iterable<Tuple2<GROUP_KEY, GROUP_VAL>> call(GROUP_VAL group_val) throws Exception {
        if (this.groupByFunction == null) {
            BatchAggregator batchAggregator = (BatchAggregator) this.pluginFunctionContext.createPlugin();
            batchAggregator.initialize((BatchRuntimeContext) this.pluginFunctionContext.createBatchRuntimeContext());
            this.groupByFunction = new TrackedTransform<>(new GroupByTransform(batchAggregator), this.pluginFunctionContext.createStageMetrics(), Constants.Metrics.RECORDS_IN, null, this.pluginFunctionContext.getDataTracer(), this.pluginFunctionContext.getStageStatisticsCollector());
            this.emitter = new DefaultEmitter<>();
        }
        this.emitter.reset();
        this.groupByFunction.transform(group_val, this.emitter);
        return this.emitter.getEntries();
    }
}
