package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.mutable.MutableObject;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.streaming.FlinkStateInternalsTest;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContextFactory;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.NoopLock;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.UnmodifiableIterator;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.powermock.reflect.Whitebox;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.class */
public class ExecutableStageDoFnOperatorTest {

    @Mock
    private RuntimeContext runtimeContext;

    @Mock
    private DistributedCache distributedCache;

    @Mock
    private ExecutableStageContext stageContext;

    @Mock
    private StageBundleFactory stageBundleFactory;

    @Mock
    private StateRequestHandler stateRequestHandler;

    @Mock
    private ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private final RunnerApi.ExecutableStagePayload stagePayload = RunnerApi.ExecutableStagePayload.newBuilder().setInput("input").setComponents(RunnerApi.Components.newBuilder().putPcollections("input", RunnerApi.PCollection.getDefaultInstance()).build()).build();
    private final String stateId = "userState";
    private final RunnerApi.ExecutableStagePayload stagePayloadWithUserState = this.stagePayload.toBuilder().setComponents(this.stagePayload.getComponents().toBuilder().putTransforms("transform", RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:pardo:v1").build()).putInputs("input", "input").build()).build()).addUserStates(RunnerApi.ExecutableStagePayload.UserStateId.newBuilder().setLocalName("userState").setTransformId("transform").build()).build();
    private final JobInfo jobInfo = JobInfo.create("job-id", "job-name", "retrieval-token", Struct.getDefaultInstance());

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest$RequestGenerator.class */
    private interface RequestGenerator {
        BeamFnApi.StateRequest makeRequest(ByteString byteString, String str) throws Exception;
    }

    @Before
    public void setUpMocks() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.runtimeContext.getDistributedCache()).thenReturn(this.distributedCache);
        Mockito.when(this.stageContext.getStageBundleFactory((ExecutableStage) Matchers.any())).thenReturn(this.stageBundleFactory);
        Mockito.when(this.processBundleDescriptor.getTimerSpecs()).thenReturn(Collections.emptyMap());
        Mockito.when(this.processBundleDescriptor.getBagUserStateSpecs()).thenReturn(Collections.emptyMap());
        Mockito.when(this.stageBundleFactory.getProcessBundleDescriptor()).thenReturn(this.processBundleDescriptor);
    }

    @Test
    public void sdkErrorsSurfaceOnClose() throws Exception {
        TupleTag<Integer> tupleTag = new TupleTag<>("main-output");
        ExecutableStageDoFnOperator<Integer, Integer> operator = getOperator(tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory<>(tupleTag, VoidCoder.of()));
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(operator);
        oneInputStreamOperatorTestHarness.open();
        RemoteBundle remoteBundle = (RemoteBundle) Mockito.mock(RemoteBundle.class);
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (StateRequestHandler) Matchers.any(), (BundleProgressHandler) Matchers.any())).thenReturn(remoteBundle);
        Mockito.when(remoteBundle.getInputReceivers()).thenReturn(ImmutableMap.of("input", (FnDataReceiver) Mockito.mock(FnDataReceiver.class)));
        RuntimeException runtimeException = new RuntimeException(new Exception());
        ((RemoteBundle) Mockito.doThrow(new Throwable[]{runtimeException}).when(remoteBundle)).close();
        this.thrown.expectCause(org.hamcrest.Matchers.is(runtimeException));
        operator.processElement(new StreamRecord(WindowedValue.valueInGlobalWindow(0)));
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void expectedInputsAreSent() throws Exception {
        TupleTag<Integer> tupleTag = new TupleTag<>("main-output");
        ExecutableStageDoFnOperator<Integer, Integer> operator = getOperator(tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory<>(tupleTag, VoidCoder.of()));
        RemoteBundle remoteBundle = (RemoteBundle) Mockito.mock(RemoteBundle.class);
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (StateRequestHandler) Matchers.any(), (BundleProgressHandler) Matchers.any())).thenReturn(remoteBundle);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        Mockito.when(remoteBundle.getInputReceivers()).thenReturn(ImmutableMap.of("input", fnDataReceiver));
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(1);
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow(2);
        WindowedValue valueInGlobalWindow3 = WindowedValue.valueInGlobalWindow(3);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(operator);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(valueInGlobalWindow));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(valueInGlobalWindow2));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(valueInGlobalWindow3));
        ((FnDataReceiver) Mockito.verify(fnDataReceiver)).accept(valueInGlobalWindow);
        ((FnDataReceiver) Mockito.verify(fnDataReceiver)).accept(valueInGlobalWindow2);
        ((FnDataReceiver) Mockito.verify(fnDataReceiver)).accept(valueInGlobalWindow3);
        Mockito.verifyNoMoreInteractions(new Object[]{fnDataReceiver});
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void outputsAreTaggedCorrectly() throws Exception {
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(VarIntCoder.of());
        final TupleTag<Integer> tupleTag = new TupleTag<>("main-output");
        final TupleTag tupleTag2 = new TupleTag("output-1");
        final TupleTag tupleTag3 = new TupleTag("output-2");
        ImmutableMap build = ImmutableMap.builder().put(tupleTag2, new OutputTag<String>(tupleTag2.getId()) { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperatorTest.2
        }).put(tupleTag3, new OutputTag<String>(tupleTag3.getId()) { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperatorTest.1
        }).build();
        DoFnOperator.MultiOutputOutputManagerFactory<Integer> multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory<>(tupleTag, build, ImmutableMap.builder().put(tupleTag, valueOnlyCoder).put(tupleTag2, valueOnlyCoder).put(tupleTag3, valueOnlyCoder).build(), ImmutableMap.builder().put(tupleTag, 0).put(tupleTag2, 1).put(tupleTag3, 2).build());
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(0);
        final WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow(3);
        final WindowedValue valueInGlobalWindow3 = WindowedValue.valueInGlobalWindow(4);
        final WindowedValue valueInGlobalWindow4 = WindowedValue.valueInGlobalWindow(5);
        Mockito.when(this.stageContext.getStageBundleFactory((ExecutableStage) Matchers.any())).thenReturn(new StageBundleFactory() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperatorTest.3
            private boolean onceEmitted;

            public RemoteBundle getBundle(final OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler) {
                return new RemoteBundle() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperatorTest.3.1
                    public String getId() {
                        return "bundle-id";
                    }

                    public Map<String, FnDataReceiver> getInputReceivers() {
                        return ImmutableMap.of("input", obj -> {
                        });
                    }

                    public void split(double d) {
                        throw new UnsupportedOperationException();
                    }

                    public void close() throws Exception {
                        if (AnonymousClass3.this.onceEmitted) {
                            return;
                        }
                        outputReceiverFactory.create(tupleTag.getId()).accept(valueInGlobalWindow2);
                        outputReceiverFactory.create(tupleTag2.getId()).accept(valueInGlobalWindow3);
                        outputReceiverFactory.create(tupleTag3.getId()).accept(valueInGlobalWindow4);
                        AnonymousClass3.this.onceEmitted = true;
                    }
                };
            }

            public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
                return ExecutableStageDoFnOperatorTest.this.processBundleDescriptor;
            }

            public void close() {
            }
        });
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(getOperator(tupleTag, ImmutableList.of(tupleTag2, tupleTag3), multiOutputOutputManagerFactory));
        long currentWatermark = oneInputStreamOperatorTestHarness.getCurrentWatermark() + 1;
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(valueInGlobalWindow));
        oneInputStreamOperatorTestHarness.processWatermark(currentWatermark);
        long j = currentWatermark + 1;
        oneInputStreamOperatorTestHarness.processWatermark(j);
        Assert.assertEquals(j, oneInputStreamOperatorTestHarness.getCurrentWatermark());
        Assert.assertEquals(0L, oneInputStreamOperatorTestHarness.getOutput().size());
        oneInputStreamOperatorTestHarness.close();
        MatcherAssert.assertThat(oneInputStreamOperatorTestHarness.getOutput(), IsIterableContainingInOrder.contains(new Object[]{new StreamRecord(valueInGlobalWindow2), new Watermark(j), new Watermark(Long.MAX_VALUE)}));
        MatcherAssert.assertThat(oneInputStreamOperatorTestHarness.getSideOutput((OutputTag) build.get(tupleTag2)), IsIterableContainingInOrder.contains(new StreamRecord[]{new StreamRecord(valueInGlobalWindow3)}));
        MatcherAssert.assertThat(oneInputStreamOperatorTestHarness.getSideOutput((OutputTag) build.get(tupleTag3)), IsIterableContainingInOrder.contains(new StreamRecord[]{new StreamRecord(valueInGlobalWindow4)}));
    }

    @Test
    public void testStageBundleClosed() throws Exception {
        TupleTag<Integer> tupleTag = new TupleTag<>("main-output");
        ExecutableStageDoFnOperator<Integer, Integer> operator = getOperator(tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory<>(tupleTag, VoidCoder.of()));
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(operator);
        RemoteBundle remoteBundle = (RemoteBundle) Mockito.mock(RemoteBundle.class);
        Mockito.when(remoteBundle.getInputReceivers()).thenReturn(ImmutableMap.builder().put("input", (FnDataReceiver) Mockito.mock(FnDataReceiver.class)).build());
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (StateRequestHandler) Matchers.any(), (BundleProgressHandler) Matchers.any())).thenReturn(remoteBundle);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.close();
        ((StageBundleFactory) Mockito.verify(this.stageBundleFactory)).getProcessBundleDescriptor();
        ((StageBundleFactory) Mockito.verify(this.stageBundleFactory)).close();
        ((ExecutableStageContext) Mockito.verify(this.stageContext)).close();
        Mockito.verifyNoMoreInteractions(new Object[]{this.stageBundleFactory});
        operator.dispose();
        Mockito.verifyNoMoreInteractions(new Object[]{remoteBundle});
    }

    @Test
    public void testEnsureStateCleanupWithKeyedInput() throws Exception {
        TupleTag<Integer> tupleTag = new TupleTag<>("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory<Integer> multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory<>(tupleTag, VarIntCoder.of());
        VarIntCoder of = VarIntCoder.of();
        ExecutableStageDoFnOperator<Integer, Integer> operator = getOperator(tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), of, WindowedValue.getFullCoder(of, GlobalWindow.Coder.INSTANCE));
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(operator, obj -> {
            return obj;
        }, new CoderTypeInformation(of));
        RemoteBundle remoteBundle = (RemoteBundle) Mockito.mock(RemoteBundle.class);
        Mockito.when(remoteBundle.getInputReceivers()).thenReturn(ImmutableMap.builder().put("input", (FnDataReceiver) Mockito.mock(FnDataReceiver.class)).build());
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (StateRequestHandler) Matchers.any(), (BundleProgressHandler) Matchers.any())).thenReturn(remoteBundle);
        keyedOneInputStreamOperatorTestHarness.open();
        Object internalState = Whitebox.getInternalState(operator, "doFnRunner");
        MatcherAssert.assertThat(internalState, org.hamcrest.Matchers.instanceOf(DoFnRunnerWithMetricsUpdate.class));
        MatcherAssert.assertThat(Whitebox.getInternalState(internalState, "delegate"), org.hamcrest.Matchers.instanceOf(StatefulDoFnRunner.class));
    }

    @Test
    public void testEnsureStateCleanupWithKeyedInputCleanupTimer() {
        InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend) Mockito.mock(KeyedStateBackend.class);
        Lock lock = (Lock) Mockito.mock(Lock.class);
        StringUtf8Coder of = StringUtf8Coder.of();
        GlobalWindow globalWindow = GlobalWindow.INSTANCE;
        new ExecutableStageDoFnOperator.CleanupTimer(inMemoryTimerInternals, lock, WindowingStrategy.globalDefault(), of, GlobalWindow.Coder.INSTANCE, keyedStateBackend).setForWindow(KV.of("key", "string"), globalWindow);
        ((Lock) Mockito.verify(lock)).lock();
        ((KeyedStateBackend) Mockito.verify(keyedStateBackend)).setCurrentKey(FlinkKeyUtils.encodeKey("key", of));
        MatcherAssert.assertThat(inMemoryTimerInternals.getNextTimer(TimeDomain.EVENT_TIME), org.hamcrest.Matchers.is(globalWindow.maxTimestamp().plus(1L)));
        ((Lock) Mockito.verify(lock)).unlock();
    }

    @Test
    public void testEnsureStateCleanupWithKeyedInputStateCleaner() {
        GlobalWindow.Coder coder = GlobalWindow.Coder.INSTANCE;
        InMemoryStateInternals forKey = InMemoryStateInternals.forKey("key");
        ImmutableList of = ImmutableList.of("state1", "state2");
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator it = of.iterator();
        while (it.hasNext()) {
            BagState state = forKey.state(StateNamespaces.window(coder, GlobalWindow.INSTANCE), StateTags.bag((String) it.next(), StringUtf8Coder.of()));
            builder.add(state);
            state.add("this should be cleaned");
        }
        ImmutableList build = builder.build();
        MutableObject mutableObject = new MutableObject(ByteBuffer.wrap(((String) forKey.getKey()).getBytes(StandardCharsets.UTF_8)));
        ExecutableStageDoFnOperator.StateCleaner stateCleaner = new ExecutableStageDoFnOperator.StateCleaner(of, coder, () -> {
            return (ByteBuffer) mutableObject.getValue();
        });
        UnmodifiableIterator it2 = build.iterator();
        while (it2.hasNext()) {
            MatcherAssert.assertThat(Integer.valueOf(Iterables.size(((BagState) it2.next()).read())), org.hamcrest.Matchers.is(1));
        }
        stateCleaner.clearForWindow(GlobalWindow.INSTANCE);
        stateCleaner.cleanupState(forKey, byteBuffer -> {
            mutableObject.setValue(byteBuffer);
        });
        UnmodifiableIterator it3 = build.iterator();
        while (it3.hasNext()) {
            MatcherAssert.assertThat(Integer.valueOf(Iterables.size(((BagState) it3.next()).read())), org.hamcrest.Matchers.is(0));
        }
    }

    @Test
    public void testEnsureDeferredStateCleanupTimerFiring() throws Exception {
        testEnsureDeferredStateCleanupTimerFiring(false);
        testEnsureDeferredStateCleanupTimerFiring(true);
    }

    private void testEnsureDeferredStateCleanupTimerFiring(boolean z) throws Exception {
        TupleTag<Integer> tupleTag = new TupleTag<>("main-output");
        DoFnOperator.MultiOutputOutputManagerFactory<Integer> multiOutputOutputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory<>(tupleTag, VoidCoder.of());
        StringUtf8Coder of = StringUtf8Coder.of();
        WindowingStrategy of2 = WindowingStrategy.of(FixedWindows.of(Duration.millis(1000L)));
        ExecutableStageDoFnOperator<Integer, Integer> operator = getOperator(tupleTag, Collections.emptyList(), multiOutputOutputManagerFactory, of2, of, WindowedValue.getFullCoder(KvCoder.of(of, VarIntCoder.of()), of2.getWindowFn().windowCoder()));
        RemoteBundle remoteBundle = (RemoteBundle) Mockito.mock(RemoteBundle.class);
        Mockito.when(this.stageBundleFactory.getBundle((OutputReceiverFactory) Matchers.any(), (StateRequestHandler) Matchers.any(), (BundleProgressHandler) Matchers.any())).thenReturn(remoteBundle);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(1000L));
        IntervalWindow.IntervalWindowCoder of3 = IntervalWindow.IntervalWindowCoder.of();
        WindowedValue of4 = WindowedValue.of(KV.of("one", 1), intervalWindow.maxTimestamp(), ImmutableList.of(intervalWindow), PaneInfo.NO_FIRING);
        FnDataReceiver fnDataReceiver = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        FnDataReceiver fnDataReceiver2 = (FnDataReceiver) Mockito.mock(FnDataReceiver.class);
        ((FnDataReceiver) Mockito.doAnswer(invocationOnMock -> {
            atomicBoolean.set(true);
            return null;
        }).when(fnDataReceiver2)).accept((WindowedValue) Matchers.any());
        Mockito.when(remoteBundle.getInputReceivers()).thenReturn(ImmutableMap.of("input", fnDataReceiver, "timerInput", fnDataReceiver2));
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(operator, obj -> {
            return obj;
        }, new CoderTypeInformation(of));
        keyedOneInputStreamOperatorTestHarness.open();
        ((Lock) Whitebox.getInternalState(operator, "stateBackendLock")).lock();
        operator.getKeyedStateBackend().setCurrentKey(FlinkKeyUtils.encodeKey((String) ((KV) of4.getValue()).getKey(), of));
        DoFnOperator.FlinkTimerInternals flinkTimerInternals = (DoFnOperator.FlinkTimerInternals) Whitebox.getInternalState(operator, "timerInternals");
        Collection collection = (Collection) Whitebox.getInternalState(Whitebox.getInternalState(Whitebox.getInternalState(Whitebox.getInternalState(operator, "doFnRunner"), "delegate"), "stateCleaner"), "cleanupQueue");
        MatcherAssert.assertThat(Integer.valueOf(keyedOneInputStreamOperatorTestHarness.numKeyedStateEntries()), org.hamcrest.Matchers.is(0));
        StateNamespace window = StateNamespaces.window(of3, intervalWindow);
        operator.keyedStateInternals.state(window, StateTags.bag("userState", StringUtf8Coder.of())).add("testUserState");
        MatcherAssert.assertThat(Integer.valueOf(keyedOneInputStreamOperatorTestHarness.numKeyedStateEntries()), org.hamcrest.Matchers.is(1));
        flinkTimerInternals.setTimer(TimerInternals.TimerData.of("timerInput", window, intervalWindow.maxTimestamp().plus(1L), TimeDomain.EVENT_TIME));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(of4));
        ((FnDataReceiver) Mockito.verify(fnDataReceiver)).accept(of4);
        operator.processWatermark(new Watermark(intervalWindow.maxTimestamp().plus(2L).getMillis()));
        Assert.assertFalse("Watermark must be held back until bundle is complete.", atomicBoolean.get());
        MatcherAssert.assertThat(collection, org.hamcrest.Matchers.hasSize(0));
        if (z) {
            keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
            Assert.assertTrue("Timer should have been triggered.", atomicBoolean.get());
            MatcherAssert.assertThat(collection, org.hamcrest.Matchers.hasSize(0));
            Mockito.verifyNoMoreInteractions(new Object[]{fnDataReceiver});
        } else {
            operator.invokeFinishBundle();
            Assert.assertTrue("Timer should have been triggered.", atomicBoolean.get());
            MatcherAssert.assertThat(collection, org.hamcrest.Matchers.hasSize(1));
            Mockito.verifyNoMoreInteractions(new Object[]{fnDataReceiver});
            operator.invokeFinishBundle();
            MatcherAssert.assertThat(collection, org.hamcrest.Matchers.hasSize(0));
        }
        MatcherAssert.assertThat(Integer.valueOf(keyedOneInputStreamOperatorTestHarness.numKeyedStateEntries()), org.hamcrest.Matchers.is(0));
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCacheTokenHandling() throws Exception {
        InMemoryStateInternals forKey = InMemoryStateInternals.forKey("test");
        KeyedStateBackend<ByteBuffer> createStateBackend = FlinkStateInternalsTest.createStateBackend();
        for (String str : new String[]{"first token", "second token"}) {
            ExecutableStageDoFnOperator.BagUserStateFactory bagUserStateFactory = new ExecutableStageDoFnOperator.BagUserStateFactory(() -> {
                return str;
            }, forKey, createStateBackend, NoopLock.get(), (Coder) null);
            ByteString copyFrom = ByteString.copyFrom("key1", Charsets.UTF_8);
            ByteString copyFrom2 = ByteString.copyFrom("key2", Charsets.UTF_8);
            Map map = (Map) Mockito.mock(Map.class);
            Map map2 = (Map) Mockito.mock(Map.class);
            Mockito.when((ProcessBundleDescriptors.BagUserStateSpec) map2.get("userstate1")).thenReturn(mockBagUserState("userstate1"));
            Mockito.when((ProcessBundleDescriptors.BagUserStateSpec) map2.get("userstate2")).thenReturn(mockBagUserState("userstate2"));
            Mockito.when((Map) map.get(ArgumentMatchers.anyString())).thenReturn(map2);
            Mockito.when(this.processBundleDescriptor.getBagUserStateSpecs()).thenReturn(map);
            StateRequestHandler forBagUserStateHandlerFactory = StateRequestHandlers.forBagUserStateHandlerFactory(this.processBundleDescriptor, bagUserStateFactory);
            MatcherAssert.assertThat(forBagUserStateHandlerFactory.getCacheTokens(), org.hamcrest.Matchers.iterableWithSize(0));
            forBagUserStateHandlerFactory.handle(getRequest(copyFrom, "userstate1"));
            BeamFnApi.ProcessBundleRequest.CacheToken cacheToken = (BeamFnApi.ProcessBundleRequest.CacheToken) Iterables.getOnlyElement(forBagUserStateHandlerFactory.getCacheTokens());
            MatcherAssert.assertThat(Boolean.valueOf(cacheToken.hasUserState()), org.hamcrest.Matchers.is(true));
            ByteString token = cacheToken.getToken();
            ByteString copyFrom3 = ByteString.copyFrom(str.getBytes(Charsets.UTF_8));
            MatcherAssert.assertThat(token, org.hamcrest.Matchers.is(copyFrom3));
            for (RequestGenerator requestGenerator : Arrays.asList(ExecutableStageDoFnOperatorTest::getRequest, ExecutableStageDoFnOperatorTest::getAppend, ExecutableStageDoFnOperatorTest::getClear)) {
                forBagUserStateHandlerFactory.handle(requestGenerator.makeRequest(copyFrom, "userstate1"));
                MatcherAssert.assertThat(((BeamFnApi.ProcessBundleRequest.CacheToken) Iterables.getOnlyElement(forBagUserStateHandlerFactory.getCacheTokens())).getToken(), org.hamcrest.Matchers.is(copyFrom3));
                forBagUserStateHandlerFactory.handle(requestGenerator.makeRequest(copyFrom2, "userstate1"));
                MatcherAssert.assertThat(((BeamFnApi.ProcessBundleRequest.CacheToken) Iterables.getOnlyElement(forBagUserStateHandlerFactory.getCacheTokens())).getToken(), org.hamcrest.Matchers.is(copyFrom3));
                forBagUserStateHandlerFactory.handle(requestGenerator.makeRequest(copyFrom2, "userstate2"));
                MatcherAssert.assertThat(((BeamFnApi.ProcessBundleRequest.CacheToken) Iterables.getOnlyElement(forBagUserStateHandlerFactory.getCacheTokens())).getToken(), org.hamcrest.Matchers.is(copyFrom3));
            }
        }
    }

    private static BeamFnApi.StateRequest getRequest(ByteString byteString, String str) throws Exception {
        BeamFnApi.StateRequest.Builder stateRequest = stateRequest(byteString, str);
        stateRequest.setGet(BeamFnApi.StateGetRequest.newBuilder().build());
        return stateRequest.build();
    }

    private static BeamFnApi.StateRequest getAppend(ByteString byteString, String str) throws Exception {
        BeamFnApi.StateRequest.Builder stateRequest = stateRequest(byteString, str);
        stateRequest.setAppend(BeamFnApi.StateAppendRequest.newBuilder().build());
        return stateRequest.build();
    }

    private static BeamFnApi.StateRequest getClear(ByteString byteString, String str) throws Exception {
        BeamFnApi.StateRequest.Builder stateRequest = stateRequest(byteString, str);
        stateRequest.setClear(BeamFnApi.StateClearRequest.newBuilder().build());
        return stateRequest.build();
    }

    private static BeamFnApi.StateRequest.Builder stateRequest(ByteString byteString, String str) throws Exception {
        return BeamFnApi.StateRequest.newBuilder().setStateKey(BeamFnApi.StateKey.newBuilder().setBagUserState(BeamFnApi.StateKey.BagUserState.newBuilder().setTransformId("transform").setKey(byteString).setUserStateId(str).setWindow(ByteString.copyFrom(CoderUtils.encodeToByteArray(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE))).build()));
    }

    private static ProcessBundleDescriptors.BagUserStateSpec mockBagUserState(String str) {
        ProcessBundleDescriptors.BagUserStateSpec bagUserStateSpec = (ProcessBundleDescriptors.BagUserStateSpec) Mockito.mock(ProcessBundleDescriptors.BagUserStateSpec.class);
        Mockito.when(bagUserStateSpec.keyCoder()).thenReturn(ByteStringCoder.of());
        Mockito.when(bagUserStateSpec.valueCoder()).thenReturn(ByteStringCoder.of());
        Mockito.when(bagUserStateSpec.transformId()).thenReturn("transformId");
        Mockito.when(bagUserStateSpec.userStateId()).thenReturn(str);
        Mockito.when(bagUserStateSpec.windowCoder()).thenReturn(GlobalWindow.Coder.INSTANCE);
        return bagUserStateSpec;
    }

    @Test
    public void testSerialization() {
        WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(VarIntCoder.of());
        TupleTag tupleTag = new TupleTag("main-output");
        TupleTag tupleTag2 = new TupleTag("additional-output");
        ExecutableStageDoFnOperator executableStageDoFnOperator = new ExecutableStageDoFnOperator("transform", WindowedValue.getValueOnlyCoder(VarIntCoder.of()), Collections.emptyMap(), tupleTag, ImmutableList.of(tupleTag2), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, ImmutableMap.builder().put(tupleTag2, new OutputTag(tupleTag2.getId(), TypeInformation.of(Integer.class))).build(), ImmutableMap.builder().put(tupleTag, valueOnlyCoder).put(tupleTag2, valueOnlyCoder).build(), ImmutableMap.builder().put(tupleTag, 0).put(tupleTag2, 1).build()), Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), PipelineOptionsFactory.as(FlinkPipelineOptions.class), this.stagePayload, this.jobInfo, FlinkExecutableStageContextFactory.getInstance(), createOutputMap(tupleTag, ImmutableList.of(tupleTag2)), WindowingStrategy.globalDefault(), (Coder) null, (KeySelector) null);
        ExecutableStageDoFnOperator clone = SerializationUtils.clone(executableStageDoFnOperator);
        Assert.assertNotNull(clone);
        Assert.assertNotEquals(executableStageDoFnOperator, clone);
    }

    private ExecutableStageDoFnOperator<Integer, Integer> getOperator(TupleTag<Integer> tupleTag, List<TupleTag<?>> list, DoFnOperator.MultiOutputOutputManagerFactory<Integer> multiOutputOutputManagerFactory) {
        return getOperator(tupleTag, list, multiOutputOutputManagerFactory, WindowingStrategy.globalDefault(), null, WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));
    }

    private ExecutableStageDoFnOperator<Integer, Integer> getOperator(TupleTag<Integer> tupleTag, List<TupleTag<?>> list, DoFnOperator.MultiOutputOutputManagerFactory<Integer> multiOutputOutputManagerFactory, WindowingStrategy windowingStrategy, @Nullable Coder coder, Coder coder2) {
        FlinkExecutableStageContextFactory flinkExecutableStageContextFactory = (FlinkExecutableStageContextFactory) Mockito.mock(FlinkExecutableStageContextFactory.class);
        Mockito.when(flinkExecutableStageContextFactory.get((JobInfo) Matchers.any())).thenReturn(this.stageContext);
        ExecutableStageDoFnOperator<Integer, Integer> executableStageDoFnOperator = new ExecutableStageDoFnOperator<>("transform", coder2, Collections.emptyMap(), tupleTag, list, multiOutputOutputManagerFactory, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), PipelineOptionsFactory.as(FlinkPipelineOptions.class), coder != null ? this.stagePayloadWithUserState : this.stagePayload, this.jobInfo, flinkExecutableStageContextFactory, createOutputMap(tupleTag, list), windowingStrategy, coder, coder != null ? new KvToByteBufferKeySelector(coder) : null);
        Whitebox.setInternalState(executableStageDoFnOperator, "stateRequestHandler", this.stateRequestHandler);
        return executableStageDoFnOperator;
    }

    private static Map<String, TupleTag<?>> createOutputMap(TupleTag tupleTag, List<TupleTag<?>> list) {
        HashMap hashMap = new HashMap(list.size() + 1);
        if (tupleTag != null) {
            hashMap.put(tupleTag.getId(), tupleTag);
        }
        for (TupleTag<?> tupleTag2 : list) {
            hashMap.put(tupleTag2.getId(), tupleTag2);
        }
        return hashMap;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 237022134:
                if (implMethodName.equals("lambda$testEnsureStateCleanupWithKeyedInput$aeea360d$1")) {
                    z = false;
                    break;
                }
                break;
            case 1430396876:
                if (implMethodName.equals("lambda$testEnsureDeferredStateCleanupTimerFiring$83c7b7c3$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj -> {
                        return obj;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj2 -> {
                        return obj2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
