package org.apache.beam.runners.jet.processors;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.jet.DAGBuilder;
import org.apache.beam.runners.jet.JetPipelineOptions;
import org.apache.beam.runners.jet.Utils;
import org.apache.beam.runners.jet.metrics.JetMetricsContainer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

/* loaded from: input_file:org/apache/beam/runners/jet/processors/AbstractParDoP.class */
abstract class AbstractParDoP<InputT, OutputT> implements Processor {
    private final SerializablePipelineOptions pipelineOptions;
    private final DoFn<InputT, OutputT> doFn;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final DoFnSchemaInformation doFnSchemaInformation;
    private final Map<TupleTag<?>, int[]> outputCollToOrdinals;
    private final TupleTag<OutputT> mainOutputTag;
    private final Coder<InputT> inputCoder;
    private final Map<PCollectionView<?>, Coder<?>> sideInputCoders;
    private final Map<TupleTag<?>, Coder<?>> outputCoders;
    private final Coder<InputT> inputValueCoder;
    private final Map<TupleTag<?>, Coder<?>> outputValueCoders;
    private final Map<Integer, PCollectionView<?>> ordinalToSideInput;
    private final String ownerId;
    private final String stepId;
    private final boolean cooperative;
    DoFnRunner<InputT, OutputT> doFnRunner;
    JetOutputManager outputManager;
    private DoFnInvoker<InputT, OutputT> doFnInvoker;
    private SideInputHandler sideInputHandler;
    private JetMetricsContainer metricsContainer;
    private SimpleInbox bufferedItems;
    private SideInputReader sideInputReader;
    private Outbox outbox;
    private Map<String, PCollectionView<?>> sideInputMapping;
    private final long metricsFlushPeriod = TimeUnit.SECONDS.toMillis(1) + ThreadLocalRandom.current().nextLong(500);
    private Set<Integer> completedSideInputs = new HashSet();
    private long lastMetricsFlushTime = System.currentTimeMillis();

    /* loaded from: input_file:org/apache/beam/runners/jet/processors/AbstractParDoP$AbstractSupplier.class */
    static abstract class AbstractSupplier<InputT, OutputT> implements SupplierEx<Processor>, DAGBuilder.WiringListener {
        protected final String ownerId;
        private final String stepId;
        private final SerializablePipelineOptions pipelineOptions;
        private final DoFn<InputT, OutputT> doFn;
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final DoFnSchemaInformation doFnSchemaInformation;
        private final TupleTag<OutputT> mainOutputTag;
        private final Map<TupleTag<?>, List<Integer>> outputCollToOrdinals;
        private final Coder<InputT> inputCoder;
        private final Map<PCollectionView<?>, Coder<?>> sideInputCoders;
        private final Map<TupleTag<?>, Coder<?>> outputCoders;
        private final Coder<InputT> inputValueCoder;
        private final Map<TupleTag<?>, Coder<?>> outputValueCoders;
        private final Collection<PCollectionView<?>> sideInputs;
        private final Map<String, PCollectionView<?>> sideInputMapping;
        private final Map<Integer, PCollectionView<?>> ordinalToSideInput = new HashMap();

        /* JADX INFO: Access modifiers changed from: package-private */
        public AbstractSupplier(String str, String str2, DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, SerializablePipelineOptions serializablePipelineOptions, TupleTag<OutputT> tupleTag, Set<TupleTag<OutputT>> set, Coder<InputT> coder, Map<PCollectionView<?>, Coder<?>> map, Map<TupleTag<?>, Coder<?>> map2, Coder<InputT> coder2, Map<TupleTag<?>, Coder<?>> map3, Collection<PCollectionView<?>> collection, Map<String, PCollectionView<?>> map4) {
            this.stepId = str;
            this.ownerId = str2;
            this.pipelineOptions = serializablePipelineOptions;
            this.doFn = doFn;
            this.windowingStrategy = windowingStrategy;
            this.doFnSchemaInformation = doFnSchemaInformation;
            this.outputCollToOrdinals = (Map) set.stream().collect(Collectors.toMap(Function.identity(), tupleTag2 -> {
                return new ArrayList();
            }));
            this.mainOutputTag = tupleTag;
            this.inputCoder = coder;
            this.sideInputCoders = map;
            this.outputCoders = map2;
            this.inputValueCoder = coder2;
            this.outputValueCoders = map3;
            this.sideInputs = collection;
            this.sideInputMapping = map4;
        }

        /* renamed from: getEx, reason: merged with bridge method [inline-methods] */
        public Processor m4getEx() {
            if (this.ordinalToSideInput.size() != this.sideInputs.size()) {
                throw new RuntimeException("Oops");
            }
            return getEx(this.doFn, this.windowingStrategy, this.doFnSchemaInformation, (Map) this.outputCollToOrdinals.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return ((List) entry.getValue()).stream().mapToInt(num -> {
                    return num.intValue();
                }).toArray();
            })), this.pipelineOptions, this.mainOutputTag, this.inputCoder, Collections.unmodifiableMap(this.sideInputCoders), Collections.unmodifiableMap(this.outputCoders), this.inputValueCoder, Collections.unmodifiableMap(this.outputValueCoders), Collections.unmodifiableMap(this.ordinalToSideInput), this.sideInputMapping, this.ownerId, this.stepId);
        }

        abstract Processor getEx(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, Map<TupleTag<?>, int[]> map, SerializablePipelineOptions serializablePipelineOptions, TupleTag<OutputT> tupleTag, Coder<InputT> coder, Map<PCollectionView<?>, Coder<?>> map2, Map<TupleTag<?>, Coder<?>> map3, Coder<InputT> coder2, Map<TupleTag<?>, Coder<?>> map4, Map<Integer, PCollectionView<?>> map5, Map<String, PCollectionView<?>> map6, String str, String str2);

        @Override // org.apache.beam.runners.jet.DAGBuilder.WiringListener
        public void isOutboundEdgeOfVertex(Edge edge, String str, String str2, String str3) {
            if (this.ownerId.equals(str3)) {
                List<Integer> list = this.outputCollToOrdinals.get(new TupleTag(str2));
                if (list == null) {
                    throw new RuntimeException("Oops");
                }
                list.add(Integer.valueOf(edge.getSourceOrdinal()));
            }
        }

        @Override // org.apache.beam.runners.jet.DAGBuilder.WiringListener
        public void isInboundEdgeOfVertex(Edge edge, String str, String str2, String str3) {
            if (this.ownerId.equals(str3)) {
                for (PCollectionView<?> pCollectionView : this.sideInputs) {
                    if (str.equals(Utils.getTupleTagId(pCollectionView))) {
                        this.ordinalToSideInput.put(Integer.valueOf(edge.getDestOrdinal()), pCollectionView);
                        return;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/jet/processors/AbstractParDoP$JetOutputManager.class */
    public static class JetOutputManager implements DoFnRunners.OutputManager {
        private final Outbox outbox;
        private final Map<TupleTag<?>, Coder<?>> outputCoders;
        private final Map<TupleTag<?>, int[]> outputCollToOrdinals;
        private final List<Object>[] outputBuckets;
        private int currentBucket;
        private int currentItem;
        static final /* synthetic */ boolean $assertionsDisabled;

        JetOutputManager(Outbox outbox, Map<TupleTag<?>, Coder<?>> map, Map<TupleTag<?>, int[]> map2) {
            this.outbox = outbox;
            this.outputCoders = map;
            this.outputCollToOrdinals = map2;
            if (!$assertionsDisabled && map2.isEmpty()) {
                throw new AssertionError();
            }
            this.outputBuckets = new List[map2.values().stream().flatMapToInt(IntStream::of).max().orElse(-1) + 1];
            Arrays.setAll(this.outputBuckets, i -> {
                return new ArrayList();
            });
        }

        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            if (!$assertionsDisabled && (this.currentBucket != 0 || this.currentItem != 0)) {
                throw new AssertionError("adding output while flushing");
            }
            byte[] encode = Utils.encode(windowedValue, this.outputCoders.get(tupleTag));
            for (int i : this.outputCollToOrdinals.get(tupleTag)) {
                this.outputBuckets[i].add(encode);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @CheckReturnValue
        public boolean tryFlush() {
            while (this.currentBucket < this.outputBuckets.length) {
                List<Object> list = this.outputBuckets[this.currentBucket];
                while (this.currentItem < list.size()) {
                    if (!this.outbox.offer(this.currentBucket, list.get(this.currentItem))) {
                        return false;
                    }
                    this.currentItem++;
                }
                list.clear();
                this.currentItem = 0;
                this.currentBucket++;
            }
            this.currentBucket = 0;
            int i = 0;
            for (List<Object> list2 : this.outputBuckets) {
                i += list2.size();
            }
            return i == 0;
        }

        static {
            $assertionsDisabled = !AbstractParDoP.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/jet/processors/AbstractParDoP$SimpleInbox.class */
    public static class SimpleInbox implements Inbox {
        private Deque<Object> items;

        private SimpleInbox() {
            this.items = new ArrayDeque();
        }

        void add(Object obj) {
            this.items.add(obj);
        }

        public boolean isEmpty() {
            return this.items.isEmpty();
        }

        public Object peek() {
            return this.items.peek();
        }

        public Object poll() {
            return this.items.poll();
        }

        public void remove() {
            this.items.remove();
        }

        public int size() {
            return this.items.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractParDoP(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, Map<TupleTag<?>, int[]> map, SerializablePipelineOptions serializablePipelineOptions, TupleTag<OutputT> tupleTag, Coder<InputT> coder, Map<PCollectionView<?>, Coder<?>> map2, Map<TupleTag<?>, Coder<?>> map3, Coder<InputT> coder2, Map<TupleTag<?>, Coder<?>> map4, Map<Integer, PCollectionView<?>> map5, Map<String, PCollectionView<?>> map6, String str, String str2) {
        this.pipelineOptions = serializablePipelineOptions;
        this.doFn = (DoFn) Utils.serde(doFn);
        this.windowingStrategy = windowingStrategy;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.outputCollToOrdinals = map;
        this.mainOutputTag = tupleTag;
        this.inputCoder = coder;
        this.sideInputCoders = (Map) map2.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Utils.deriveIterableValueCoder((WindowedValue.FullWindowedValueCoder) entry.getValue());
        }));
        this.outputCoders = map3;
        this.inputValueCoder = coder2;
        this.outputValueCoders = map4;
        this.ordinalToSideInput = map5;
        this.sideInputMapping = map6;
        this.ownerId = str;
        this.stepId = str2;
        this.cooperative = isCooperativenessAllowed(serializablePipelineOptions).booleanValue() && hasOutput();
    }

    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.outbox = outbox;
        this.metricsContainer = new JetMetricsContainer(this.stepId, this.ownerId, context);
        this.doFnInvoker = DoFnInvokers.invokerFor(this.doFn);
        this.doFnInvoker.invokeSetup();
        if (this.ordinalToSideInput.isEmpty()) {
            this.sideInputReader = NullSideInputReader.of(Collections.emptyList());
        } else {
            this.bufferedItems = new SimpleInbox();
            this.sideInputHandler = new SideInputHandler(this.ordinalToSideInput.values(), InMemoryStateInternals.forKey((Object) null));
            this.sideInputReader = this.sideInputHandler;
        }
        this.outputManager = new JetOutputManager(outbox, this.outputCoders, this.outputCollToOrdinals);
        this.doFnRunner = getDoFnRunner(this.pipelineOptions.get(), this.doFn, this.sideInputReader, this.outputManager, this.mainOutputTag, Lists.newArrayList(this.outputCollToOrdinals.keySet()), this.inputValueCoder, this.outputValueCoders, this.windowingStrategy, this.doFnSchemaInformation, this.sideInputMapping);
    }

    protected abstract DoFnRunner<InputT, OutputT> getDoFnRunner(PipelineOptions pipelineOptions, DoFn<InputT, OutputT> doFn, SideInputReader sideInputReader, JetOutputManager jetOutputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, Coder<InputT> coder, Map<TupleTag<?>, Coder<?>> map, WindowingStrategy<?, ?> windowingStrategy, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map2);

    public boolean isCooperative() {
        return this.cooperative;
    }

    public void close() {
        this.doFnInvoker.invokeTeardown();
    }

    public void process(int i, @Nonnull Inbox inbox) {
        MetricsEnvironment.setCurrentContainer(this.metricsContainer);
        if (this.outputManager.tryFlush()) {
            PCollectionView<?> pCollectionView = this.ordinalToSideInput.get(Integer.valueOf(i));
            if (pCollectionView != null) {
                processSideInput(pCollectionView, inbox);
            } else if (this.bufferedItems != null) {
                processBufferedRegularItems(inbox);
            } else {
                processNonBufferedRegularItems(inbox);
            }
            MetricsEnvironment.setCurrentContainer((MetricsContainer) null);
        }
    }

    private void processSideInput(PCollectionView<?> pCollectionView, Inbox inbox) {
        while (true) {
            byte[] bArr = (byte[]) inbox.poll();
            if (bArr == null) {
                return;
            }
            this.sideInputHandler.addSideInputValue(pCollectionView, Utils.decodeWindowedValue(bArr, this.sideInputCoders.get(pCollectionView)));
        }
    }

    private void processNonBufferedRegularItems(Inbox inbox) {
        startRunnerBundle(this.doFnRunner);
        do {
            byte[] bArr = (byte[]) inbox.poll();
            if (bArr == null) {
                break;
            }
            processElementWithRunner(this.doFnRunner, Utils.decodeWindowedValue(bArr, this.inputCoder));
        } while (this.outputManager.tryFlush());
        finishRunnerBundle(this.doFnRunner);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startRunnerBundle(DoFnRunner<InputT, OutputT> doFnRunner) {
        doFnRunner.startBundle();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processElementWithRunner(DoFnRunner<InputT, OutputT> doFnRunner, WindowedValue<InputT> windowedValue) {
        doFnRunner.processElement(windowedValue);
    }

    protected void finishRunnerBundle(DoFnRunner<InputT, OutputT> doFnRunner) {
        doFnRunner.finishBundle();
    }

    private void processBufferedRegularItems(Inbox inbox) {
        while (true) {
            byte[] bArr = (byte[]) inbox.poll();
            if (bArr == null) {
                return;
            } else {
                this.bufferedItems.add(bArr);
            }
        }
    }

    public boolean tryProcess() {
        boolean tryFlush = this.outputManager.tryFlush();
        if (tryFlush && System.currentTimeMillis() > this.lastMetricsFlushTime + this.metricsFlushPeriod) {
            this.metricsContainer.flush(true);
            this.lastMetricsFlushTime = System.currentTimeMillis();
        }
        return tryFlush;
    }

    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return this.outbox.offer(watermark);
    }

    public boolean completeEdge(int i) {
        if (this.ordinalToSideInput.get(Integer.valueOf(i)) == null) {
            return true;
        }
        this.completedSideInputs.add(Integer.valueOf(i));
        if (this.completedSideInputs.size() != this.ordinalToSideInput.size()) {
            return true;
        }
        processNonBufferedRegularItems(this.bufferedItems);
        if (!this.bufferedItems.isEmpty()) {
            return false;
        }
        this.bufferedItems = null;
        return true;
    }

    public boolean complete() {
        boolean tryFlush = this.outputManager.tryFlush();
        if (tryFlush) {
            this.metricsContainer.flush(false);
        }
        return tryFlush;
    }

    private boolean hasOutput() {
        Iterator<int[]> it = this.outputCollToOrdinals.values().iterator();
        while (it.hasNext()) {
            if (it.next().length > 0) {
                return true;
            }
        }
        return false;
    }

    private static Boolean isCooperativenessAllowed(SerializablePipelineOptions serializablePipelineOptions) {
        return ((JetPipelineOptions) serializablePipelineOptions.get().as(JetPipelineOptions.class)).getJetProcessorsCooperative();
    }
}
