package org.apache.beam.runners.jet.processors;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.jet.Utils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

/* loaded from: input_file:org/apache/beam/runners/jet/processors/WindowGroupP.class */
public class WindowGroupP<K, V> extends AbstractProcessor {
    private static final int PROCESSING_TIME_MIN_INCREMENT = 100;
    private static final Object COMPLETE_MARKER = new Object();
    private static final Object TRY_PROCESS_MARKER = new Object();
    private final SerializablePipelineOptions pipelineOptions;
    private final Coder<V> inputValueValueCoder;
    private final Coder outputCoder;
    private final WindowingStrategy<V, BoundedWindow> windowingStrategy;
    private final AbstractProcessor.FlatMapper<Object, Object> flatMapper;
    private final String ownerId;
    private final Map<Utils.ByteArrayKey, WindowGroupP<K, V>.KeyManager> keyManagers = new HashMap();
    private final AppendableTraverser<Object> appendableTraverser = new AppendableTraverser<>(128);
    private Instant latestWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private long lastProcessingTime = System.currentTimeMillis();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/jet/processors/WindowGroupP$InMemoryStateInternalsImpl.class */
    public static class InMemoryStateInternalsImpl extends InMemoryStateInternals {
        InMemoryStateInternalsImpl(Object obj) {
            super(obj);
        }

        Instant earliestWatermarkHold() {
            ReadableInstant readableInstant = null;
            for (WatermarkHoldState watermarkHoldState : this.inMemoryState.values()) {
                if (watermarkHoldState instanceof WatermarkHoldState) {
                    ReadableInstant readableInstant2 = (Instant) watermarkHoldState.read();
                    if (readableInstant == null || (readableInstant2 != null && readableInstant2.isBefore(readableInstant))) {
                        readableInstant = readableInstant2;
                    }
                }
            }
            return readableInstant;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/jet/processors/WindowGroupP$KeyManager.class */
    public class KeyManager {
        private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
        private final InMemoryStateInternalsImpl stateInternals;
        private final ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner;

        KeyManager(K k) {
            this.stateInternals = new InMemoryStateInternalsImpl(k);
            this.reduceFnRunner = new ReduceFnRunner<>(k, WindowGroupP.this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(WindowGroupP.this.windowingStrategy.getTrigger()))), this.stateInternals, this.timerInternals, new OutputWindowedValue<KV<K, Iterable<V>>>() { // from class: org.apache.beam.runners.jet.processors.WindowGroupP.KeyManager.1
                public void outputWindowedValue(KV<K, Iterable<V>> kv, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                    WindowGroupP.this.appendableTraverser.append(Utils.encode(WindowedValue.of(kv, instant, collection, paneInfo), WindowGroupP.this.outputCoder));
                }

                public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                    throw new UnsupportedOperationException("Grouping should not use side outputs");
                }

                public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
                    outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
                }
            }, NullSideInputReader.empty(), SystemReduceFn.buffering(WindowGroupP.this.inputValueValueCoder), WindowGroupP.this.pipelineOptions.get());
            advanceWatermark(WindowGroupP.this.latestWatermark, Instant.now());
        }

        void advanceWatermark(Instant instant, Instant instant2) {
            try {
                this.timerInternals.advanceProcessingTime(instant2);
                advanceInputWatermark(instant);
                Instant earliestWatermarkHold = this.stateInternals.earliestWatermarkHold();
                if (earliestWatermarkHold == null) {
                    WindowTracing.trace("TestInMemoryTimerInternals.advanceInputWatermark: no holds, so output watermark = input watermark", new Object[0]);
                    earliestWatermarkHold = this.timerInternals.currentInputWatermarkTime();
                }
                advanceOutputWatermark(earliestWatermarkHold);
                this.reduceFnRunner.persist();
            } catch (Exception e) {
                throw ExceptionUtil.rethrow(e);
            }
        }

        void advanceProcessingTime(Instant instant) {
            try {
                this.timerInternals.advanceProcessingTime(instant);
                this.reduceFnRunner.persist();
            } catch (Exception e) {
                throw ExceptionUtil.rethrow(e);
            }
        }

        private void advanceInputWatermark(Instant instant) throws Exception {
            this.timerInternals.advanceInputWatermark(instant);
            while (true) {
                ArrayList arrayList = new ArrayList();
                while (true) {
                    TimerInternals.TimerData removeNextEventTimer = this.timerInternals.removeNextEventTimer();
                    if (removeNextEventTimer == null) {
                        break;
                    } else {
                        arrayList.add(removeNextEventTimer);
                    }
                }
                if (arrayList.isEmpty()) {
                    return;
                } else {
                    this.reduceFnRunner.onTimers(arrayList);
                }
            }
        }

        private void advanceOutputWatermark(Instant instant) {
            Objects.requireNonNull(instant);
            this.timerInternals.advanceOutputWatermark(instant);
        }

        public void processElement(WindowedValue<V> windowedValue) {
            if (dropLateWindows(windowedValue.getWindows()).isEmpty()) {
                return;
            }
            try {
                this.reduceFnRunner.processElements(Collections.singletonList(windowedValue));
                this.reduceFnRunner.persist();
            } catch (Exception e) {
                throw ExceptionUtil.rethrow(e);
            }
        }

        private Collection<? extends BoundedWindow> dropLateWindows(Collection<? extends BoundedWindow> collection) {
            boolean z = false;
            Iterator<? extends BoundedWindow> it = collection.iterator();
            while (!z && it.hasNext()) {
                if (isExpiredWindow(it.next())) {
                    z = true;
                }
            }
            return !z ? collection : (Collection) collection.stream().filter(boundedWindow -> {
                return !isExpiredWindow(boundedWindow);
            }).collect(Collectors.toList());
        }

        private boolean isExpiredWindow(BoundedWindow boundedWindow) {
            return LateDataUtils.garbageCollectionTime(boundedWindow, WindowGroupP.this.windowingStrategy).isBefore(this.timerInternals.currentInputWatermarkTime());
        }
    }

    private WindowGroupP(SerializablePipelineOptions serializablePipelineOptions, WindowedValue.WindowedValueCoder<KV<K, V>> windowedValueCoder, Coder coder, WindowingStrategy<V, BoundedWindow> windowingStrategy, String str) {
        this.pipelineOptions = serializablePipelineOptions;
        KvCoder valueCoder = windowedValueCoder.getValueCoder();
        this.inputValueValueCoder = valueCoder.getValueCoder();
        this.outputCoder = coder;
        this.windowingStrategy = windowingStrategy;
        this.ownerId = str;
        this.flatMapper = flatMapper(obj -> {
            if (COMPLETE_MARKER == obj) {
                advanceWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
            } else if (TRY_PROCESS_MARKER == obj) {
                Instant now = Instant.now();
                if (now.getMillis() - this.lastProcessingTime > 100) {
                    this.lastProcessingTime = now.getMillis();
                    advanceProcessingTime(now);
                }
            } else if (obj instanceof Watermark) {
                advanceWatermark(((Watermark) obj).timestamp());
                this.appendableTraverser.append(obj);
            } else {
                WindowedValue decodeWindowedValue = Utils.decodeWindowedValue((byte[]) obj, windowedValueCoder);
                KV kv = (KV) decodeWindowedValue.getValue();
                Object key = kv.getKey();
                Object value = kv.getValue();
                Utils.ByteArrayKey byteArrayKey = new Utils.ByteArrayKey(Utils.encode(key, valueCoder.getKeyCoder()));
                this.keyManagers.computeIfAbsent(byteArrayKey, byteArrayKey2 -> {
                    return new KeyManager(key);
                }).processElement(WindowedValue.of(value, decodeWindowedValue.getTimestamp(), decodeWindowedValue.getWindows(), decodeWindowedValue.getPane()));
            }
            return this.appendableTraverser;
        });
    }

    public static <K, V> SupplierEx<Processor> supplier(SerializablePipelineOptions serializablePipelineOptions, WindowedValue.WindowedValueCoder<KV<K, V>> windowedValueCoder, Coder coder, WindowingStrategy windowingStrategy, String str) {
        return () -> {
            return new WindowGroupP(serializablePipelineOptions, windowedValueCoder, coder, windowingStrategy, str);
        };
    }

    public boolean tryProcess() {
        return this.flatMapper.tryProcess(TRY_PROCESS_MARKER);
    }

    protected boolean tryProcess(int i, @Nonnull Object obj) {
        return this.flatMapper.tryProcess(obj);
    }

    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return this.flatMapper.tryProcess(watermark);
    }

    public boolean complete() {
        return this.flatMapper.tryProcess(COMPLETE_MARKER);
    }

    private void advanceWatermark(long j) {
        this.latestWatermark = new Instant(j);
        Instant now = Instant.now();
        Iterator<WindowGroupP<K, V>.KeyManager> it = this.keyManagers.values().iterator();
        while (it.hasNext()) {
            it.next().advanceWatermark(this.latestWatermark, now);
        }
    }

    private void advanceProcessingTime(Instant instant) {
        Iterator<WindowGroupP<K, V>.KeyManager> it = this.keyManagers.values().iterator();
        while (it.hasNext()) {
            it.next().advanceProcessingTime(instant);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -976957647:
                if (implMethodName.equals("lambda$supplier$3cf706bd$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/jet/processors/WindowGroupP") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/runners/core/construction/SerializablePipelineOptions;Lorg/apache/beam/sdk/util/WindowedValue$WindowedValueCoder;Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/values/WindowingStrategy;Ljava/lang/String;)Lcom/hazelcast/jet/core/Processor;")) {
                    SerializablePipelineOptions serializablePipelineOptions = (SerializablePipelineOptions) serializedLambda.getCapturedArg(0);
                    WindowedValue.WindowedValueCoder windowedValueCoder = (WindowedValue.WindowedValueCoder) serializedLambda.getCapturedArg(1);
                    Coder coder = (Coder) serializedLambda.getCapturedArg(2);
                    WindowingStrategy windowingStrategy = (WindowingStrategy) serializedLambda.getCapturedArg(3);
                    String str = (String) serializedLambda.getCapturedArg(4);
                    return () -> {
                        return new WindowGroupP(serializablePipelineOptions, windowedValueCoder, coder, windowingStrategy, str);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
