package org.apache.flink.graph.pregel;

import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.Either;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/graph/pregel/VertexCentricIteration.class */
public class VertexCentricIteration<K, VV, EV, Message> implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> {
    private final ComputeFunction<K, VV, EV, Message> computeFunction;
    private final MessageCombiner<K, Message> combineFunction;
    private final DataSet<Edge<K, EV>> edgesWithValue;
    private final int maximumNumberOfIterations;
    private final TypeInformation<Message> messageType;
    private DataSet<Vertex<K, VV>> initialVertices;
    private VertexCentricConfiguration configuration;

    @FunctionAnnotation.ForwardedFieldsFirst({"*->f0"})
    @FunctionAnnotation.ForwardedFieldsSecond({"f1->f1"})
    /* loaded from: input_file:org/apache/flink/graph/pregel/VertexCentricIteration$AppendVertexState.class */
    private static final class AppendVertexState<K, VV, Message> implements JoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>, Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> {
        private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple;

        private AppendVertexState() {
            this.outTuple = new Tuple2<>();
        }

        public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join(Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> tuple2) {
            this.outTuple.setField(vertex, 0);
            this.outTuple.setField(tuple2.f1, 1);
            return this.outTuple;
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/pregel/VertexCentricIteration$InitializeWorkSet.class */
    private static class InitializeWorkSet<K, VV, Message> extends RichMapFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>> {
        private Tuple2<K, Either<NullValue, Message>> outTuple;
        private Either<NullValue, Message> nullMessage;

        private InitializeWorkSet() {
        }

        public void open(Configuration configuration) {
            this.outTuple = new Tuple2<>();
            this.nullMessage = Either.Left(NullValue.getInstance());
            this.outTuple.setField(this.nullMessage, 1);
        }

        public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> vertex) {
            this.outTuple.setField(vertex.getId(), 0);
            return this.outTuple;
        }
    }

    @FunctionAnnotation.ForwardedFields({"f0"})
    /* loaded from: input_file:org/apache/flink/graph/pregel/VertexCentricIteration$MessageCombinerUdf.class */
    public static class MessageCombinerUdf<K, Message> extends RichGroupReduceFunction<Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> implements ResultTypeQueryable<Tuple2<K, Either<NullValue, Message>>>, GroupCombineFunction<Tuple2<K, Either<NullValue, Message>>, Tuple2<K, Either<NullValue, Message>>> {
        final MessageCombiner<K, Message> combinerFunction;
        private transient TypeInformation<Tuple2<K, Either<NullValue, Message>>> resultType;

        private MessageCombinerUdf(MessageCombiner<K, Message> messageCombiner, TypeInformation<Tuple2<K, Either<NullValue, Message>>> typeInformation) {
            this.combinerFunction = messageCombiner;
            this.resultType = typeInformation;
        }

        public TypeInformation<Tuple2<K, Either<NullValue, Message>>> getProducedType() {
            return this.resultType;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void reduce(Iterable<Tuple2<K, Either<NullValue, Message>>> iterable, Collector<Tuple2<K, Either<NullValue, Message>>> collector) throws Exception {
            Iterator<Tuple2<K, Either<NullValue, Message>>> it = iterable.iterator();
            if (it.hasNext()) {
                Tuple2<K, Either<NullValue, Message>> next = it.next();
                Object obj = next.f0;
                MessageIterator messageIterator = new MessageIterator();
                messageIterator.setFirst(((Either) next.f1).right());
                messageIterator.setSource(it);
                this.combinerFunction.set(obj, collector);
                this.combinerFunction.combineMessages(messageIterator);
            }
        }

        public void combine(Iterable<Tuple2<K, Either<NullValue, Message>>> iterable, Collector<Tuple2<K, Either<NullValue, Message>>> collector) throws Exception {
            reduce(iterable, collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/pregel/VertexCentricIteration$ProjectMessages.class */
    private static final class ProjectMessages<K, VV, Message> implements FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Tuple2<K, Either<NullValue, Message>>> {
        private Tuple2<K, Either<NullValue, Message>> outTuple;

        private ProjectMessages() {
            this.outTuple = new Tuple2<>();
        }

        public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> either, Collector<Tuple2<K, Either<NullValue, Message>>> collector) {
            if (either.isRight()) {
                Tuple2 tuple2 = (Tuple2) either.right();
                this.outTuple.setField(tuple2.f0, 0);
                this.outTuple.setField(Either.Right(tuple2.f1), 1);
                collector.collect(this.outTuple);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/pregel/VertexCentricIteration$ProjectNewVertexValue.class */
    private static final class ProjectNewVertexValue<K, VV, Message> implements FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Vertex<K, VV>> {
        private ProjectNewVertexValue() {
        }

        public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> either, Collector<Vertex<K, VV>> collector) {
            if (either.isLeft()) {
                collector.collect(either.left());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/pregel/VertexCentricIteration$VertexComputeUdf.class */
    private static class VertexComputeUdf<K, VV, EV, Message> extends RichCoGroupFunction<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>, Either<Vertex<K, VV>, Tuple2<K, Message>>> implements ResultTypeQueryable<Either<Vertex<K, VV>, Tuple2<K, Message>>> {
        final ComputeFunction<K, VV, EV, Message> computeFunction;
        private transient TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> resultType;

        private VertexComputeUdf(ComputeFunction<K, VV, EV, Message> computeFunction, TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> typeInformation) {
            this.computeFunction = computeFunction;
            this.resultType = typeInformation;
        }

        public TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> getProducedType() {
            return this.resultType;
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.computeFunction.init(getIterationRuntimeContext());
            }
            this.computeFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.computeFunction.postSuperstep();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void coGroup(Iterable<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> iterable, Iterable<Edge<K, EV>> iterable2, Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> collector) throws Exception {
            Iterator<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> it = iterable.iterator();
            if (it.hasNext()) {
                Tuple2<Vertex<K, VV>, Either<NullValue, Message>> next = it.next();
                Vertex vertex = (Vertex) next.f0;
                MessageIterator messageIterator = new MessageIterator();
                if (getIterationRuntimeContext().getSuperstepNumber() != 1) {
                    messageIterator.setFirst(((Either) next.f1).right());
                    messageIterator.setSource(it);
                }
                this.computeFunction.set(vertex.getId(), iterable2.iterator(), collector);
                this.computeFunction.compute(vertex, messageIterator);
            }
        }
    }

    private VertexCentricIteration(ComputeFunction<K, VV, EV, Message> computeFunction, DataSet<Edge<K, EV>> dataSet, MessageCombiner<K, Message> messageCombiner, int i) {
        Preconditions.checkNotNull(computeFunction);
        Preconditions.checkNotNull(dataSet);
        Preconditions.checkArgument(i > 0, "The maximum number of iterations must be at least one.");
        this.computeFunction = computeFunction;
        this.edgesWithValue = dataSet;
        this.combineFunction = messageCombiner;
        this.maximumNumberOfIterations = i;
        this.messageType = getMessageType(computeFunction);
    }

    private TypeInformation<Message> getMessageType(ComputeFunction<K, VV, EV, Message> computeFunction) {
        return TypeExtractor.createTypeInfo(computeFunction, ComputeFunction.class, computeFunction.getClass(), 3);
    }

    public void setInput(DataSet<Vertex<K, VV>> dataSet) {
        this.initialVertices = dataSet;
    }

    public DataSet<Vertex<K, VV>> createResult() {
        if (this.initialVertices == null) {
            throw new IllegalStateException("The input data set has not been set.");
        }
        TypeInformation typeAt = this.initialVertices.getType().getTypeAt(0);
        TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(new TypeInformation[]{typeAt, this.messageType});
        TypeInformation type = this.initialVertices.getType();
        EitherTypeInfo eitherTypeInfo = new EitherTypeInfo(type, tupleTypeInfo);
        TypeInformation eitherTypeInfo2 = new EitherTypeInfo(TypeExtractor.getForClass(NullValue.class), this.messageType);
        TupleTypeInfo tupleTypeInfo2 = new TupleTypeInfo(new TypeInformation[]{typeAt, eitherTypeInfo2});
        DeltaIteration<?, ?> iterateDelta = this.initialVertices.iterateDelta(this.initialVertices.map(new InitializeWorkSet()).returns(tupleTypeInfo2), this.maximumNumberOfIterations, new int[]{0});
        setUpIteration(iterateDelta);
        CoGroupOperator with = iterateDelta.getSolutionSet().join(iterateDelta.getWorkset()).where(new int[]{0}).equalTo(new int[]{0}).with(new AppendVertexState()).returns(new TupleTypeInfo(new TypeInformation[]{type, eitherTypeInfo2})).coGroup(this.edgesWithValue).where(new String[]{"f0.f0"}).equalTo(new int[]{0}).with(new VertexComputeUdf(this.computeFunction, eitherTypeInfo));
        SingleInputUdfOperator returns = with.flatMap(new ProjectNewVertexValue()).returns(type);
        GroupReduceOperator returns2 = with.flatMap(new ProjectMessages()).returns(tupleTypeInfo2);
        GroupReduceOperator groupReduceOperator = returns2;
        if (this.combineFunction != null) {
            groupReduceOperator = returns2.groupBy(new int[]{0}).reduceGroup(new MessageCombinerUdf(this.combineFunction, tupleTypeInfo2)).setCombinable(true);
        }
        CoGroupOperator name = with.name("Compute Function");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> tuple2 : this.configuration.getBcastVars()) {
                name = (CoGroupOperator) name.withBroadcastSet((DataSet) tuple2.f1, (String) tuple2.f0);
            }
        }
        return iterateDelta.closeWith(returns, groupReduceOperator);
    }

    public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(DataSet<Edge<K, EV>> dataSet, ComputeFunction<K, VV, EV, Message> computeFunction, int i) {
        return new VertexCentricIteration<>(computeFunction, dataSet, null, i);
    }

    public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(DataSet<Edge<K, EV>> dataSet, ComputeFunction<K, VV, EV, Message> computeFunction, MessageCombiner<K, Message> messageCombiner, int i) {
        return new VertexCentricIteration<>(computeFunction, dataSet, messageCombiner, i);
    }

    public void configure(VertexCentricConfiguration vertexCentricConfiguration) {
        this.configuration = vertexCentricConfiguration;
    }

    public VertexCentricConfiguration getIterationConfiguration() {
        return this.configuration;
    }

    private void setUpIteration(DeltaIteration<?, ?> deltaIteration) {
        if (this.configuration == null) {
            deltaIteration.name("Vertex-centric iteration (" + this.computeFunction + ")");
            return;
        }
        deltaIteration.name(this.configuration.getName("Vertex-centric iteration (" + this.computeFunction + ")"));
        deltaIteration.parallelism(this.configuration.getParallelism());
        deltaIteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
        for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
            deltaIteration.registerAggregator(entry.getKey(), entry.getValue());
        }
    }
}
