package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.direct.CommittedResult;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutorTest.class */
public class DirectTransformExecutorTest {
    private PCollection<String> created;
    private AppliedPTransform<?, ?, ?> createdProducer;
    private AppliedPTransform<?, ?, ?> downstreamProducer;
    private CountDownLatch evaluatorCompleted;
    private RegisteringCompletionCallback completionCallback;
    private TransformExecutorService transformEvaluationState;
    private BundleFactory bundleFactory;

    @Mock
    private DirectMetrics metrics;

    @Mock
    private EvaluationContext evaluationContext;

    @Mock
    private TransformEvaluatorRegistry registry;

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutorTest$RegisteringCompletionCallback.class */
    private static class RegisteringCompletionCallback implements CompletionCallback {
        private TransformResult<?> handledResult;
        private boolean handledEmpty;
        private Exception handledException;
        private final CountDownLatch onMethod;

        private RegisteringCompletionCallback(CountDownLatch countDownLatch) {
            this.handledResult = null;
            this.handledEmpty = false;
            this.handledException = null;
            this.onMethod = countDownLatch;
        }

        public CommittedResult handleResult(CommittedBundle<?> committedBundle, TransformResult<?> transformResult) {
            this.handledResult = transformResult;
            this.onMethod.countDown();
            Iterable emptyList = transformResult.getUnprocessedElements() == null ? Collections.emptyList() : transformResult.getUnprocessedElements();
            return CommittedResult.create(transformResult, (committedBundle == null || Iterables.isEmpty(emptyList)) ? Optional.absent() : Optional.of(committedBundle.withElements(emptyList)), Collections.emptyList(), EnumSet.noneOf(CommittedResult.OutputType.class));
        }

        public void handleEmpty(AppliedPTransform<?, ?, ?> appliedPTransform) {
            this.handledEmpty = true;
            this.onMethod.countDown();
        }

        public void handleException(CommittedBundle<?> committedBundle, Exception exc) {
            this.handledException = exc;
            this.onMethod.countDown();
        }

        public void handleError(Error error) {
            throw error;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutorTest$TestEnforcement.class */
    public static class TestEnforcement<T> implements ModelEnforcement<T> {
        private final List<WindowedValue<T>> beforeElements;
        private final List<WindowedValue<T>> afterElements;
        private final List<TransformResult<?>> finishedBundles;

        private TestEnforcement() {
            this.beforeElements = new ArrayList();
            this.afterElements = new ArrayList();
            this.finishedBundles = new ArrayList();
        }

        public void beforeElement(WindowedValue<T> windowedValue) {
            this.beforeElements.add(windowedValue);
        }

        public void afterElement(WindowedValue<T> windowedValue) {
            this.afterElements.add(windowedValue);
        }

        public void afterFinish(CommittedBundle<T> committedBundle, TransformResult<T> transformResult, Iterable<? extends CommittedBundle<?>> iterable) {
            this.finishedBundles.add(transformResult);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutorTest$TestEnforcementFactory.class */
    private static class TestEnforcementFactory implements ModelEnforcementFactory {
        private TestEnforcement<?> instance;

        private TestEnforcementFactory() {
        }

        public <T> TestEnforcement<T> forBundle(CommittedBundle<T> committedBundle, AppliedPTransform<?, ?, ?> appliedPTransform) {
            TestEnforcement testEnforcement = (TestEnforcement<T>) new TestEnforcement();
            this.instance = testEnforcement;
            return testEnforcement;
        }

        /* renamed from: forBundle, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ ModelEnforcement m187forBundle(CommittedBundle committedBundle, AppliedPTransform appliedPTransform) {
            return forBundle(committedBundle, (AppliedPTransform<?, ?, ?>) appliedPTransform);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutorTest$ThrowingEnforcementFactory.class */
    public static class ThrowingEnforcementFactory implements ModelEnforcementFactory {
        private final When when;

        /* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutorTest$ThrowingEnforcementFactory$ThrowingEnforcement.class */
        private class ThrowingEnforcement<T> implements ModelEnforcement<T> {
            private ThrowingEnforcement() {
            }

            public void beforeElement(WindowedValue<T> windowedValue) {
                if (ThrowingEnforcementFactory.this.when == When.BEFORE_ELEMENT) {
                    throw new RuntimeException("beforeElement");
                }
            }

            public void afterElement(WindowedValue<T> windowedValue) {
                if (ThrowingEnforcementFactory.this.when == When.AFTER_ELEMENT) {
                    throw new RuntimeException("afterElement");
                }
            }

            public void afterFinish(CommittedBundle<T> committedBundle, TransformResult<T> transformResult, Iterable<? extends CommittedBundle<?>> iterable) {
                if (ThrowingEnforcementFactory.this.when == When.AFTER_BUNDLE) {
                    throw new RuntimeException("afterFinish");
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/direct/DirectTransformExecutorTest$ThrowingEnforcementFactory$When.class */
        public enum When {
            BEFORE_BUNDLE,
            BEFORE_ELEMENT,
            AFTER_ELEMENT,
            AFTER_BUNDLE
        }

        private ThrowingEnforcementFactory(When when) {
            this.when = when;
        }

        public <T> ModelEnforcement<T> forBundle(CommittedBundle<T> committedBundle, AppliedPTransform<?, ?, ?> appliedPTransform) {
            if (this.when == When.BEFORE_BUNDLE) {
                throw new RuntimeException("forBundle");
            }
            return new ThrowingEnforcement();
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.bundleFactory = ImmutableListBundleFactory.create();
        this.transformEvaluationState = TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService());
        this.evaluatorCompleted = new CountDownLatch(1);
        this.completionCallback = new RegisteringCompletionCallback(this.evaluatorCompleted);
        this.created = this.p.apply(Create.of("foo", new String[]{"spam", "third"}));
        PCollection apply = this.created.apply(WithKeys.of(3));
        DirectGraphs.performDirectOverrides(this.p);
        DirectGraph graph = DirectGraphs.getGraph(this.p);
        this.createdProducer = graph.getProducer(this.created);
        this.downstreamProducer = graph.getProducer(apply);
        Mockito.when(this.evaluationContext.getMetrics()).thenReturn(this.metrics);
    }

    @Test
    public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.createdProducer).build();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Mockito.when(this.registry.forApplication(this.createdProducer, (CommittedBundle) null)).thenReturn(new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.DirectTransformExecutorTest.1
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
                throw new IllegalArgumentException("Shouldn't be called");
            }

            public TransformResult<Object> finishBundle() throws Exception {
                atomicBoolean.set(true);
                return build;
            }
        });
        new DirectTransformExecutor(this.evaluationContext, this.registry, Collections.emptyList(), (CommittedBundle) null, this.createdProducer, this.completionCallback, this.transformEvaluationState).run();
        Assert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        Assert.assertThat(this.completionCallback.handledResult, Matchers.equalTo(build));
        Assert.assertThat(this.completionCallback.handledException, Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void nullTransformEvaluatorTerminates() throws Exception {
        Mockito.when(this.registry.forApplication(this.createdProducer, (CommittedBundle) null)).thenReturn((Object) null);
        new DirectTransformExecutor(this.evaluationContext, this.registry, Collections.emptyList(), (CommittedBundle) null, this.createdProducer, this.completionCallback, this.transformEvaluationState).run();
        Assert.assertThat(this.completionCallback.handledResult, Matchers.is(Matchers.nullValue()));
        Assert.assertThat(Boolean.valueOf(this.completionCallback.handledEmpty), Matchers.equalTo(true));
        Assert.assertThat(this.completionCallback.handledException, Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.downstreamProducer).build();
        final ArrayList arrayList = new ArrayList();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.DirectTransformExecutorTest.2
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
                arrayList.add(windowedValue);
            }

            public TransformResult<String> finishBundle() throws Exception {
                return build;
            }
        };
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("foo");
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow("spam");
        WindowedValue valueInGlobalWindow3 = WindowedValue.valueInGlobalWindow("third");
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).add(valueInGlobalWindow).add(valueInGlobalWindow2).add(valueInGlobalWindow3).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(new DirectTransformExecutor(this.evaluationContext, this.registry, Collections.emptyList(), commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        submit.get();
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow2, valueInGlobalWindow3, valueInGlobalWindow}));
        Assert.assertThat(this.completionCallback.handledResult, Matchers.equalTo(build));
        Assert.assertThat(this.completionCallback.handledException, Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void processElementThrowsExceptionCallsback() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.downstreamProducer).build();
        final Exception exc = new Exception();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.DirectTransformExecutorTest.3
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
                throw exc;
            }

            public TransformResult<String> finishBundle() throws Exception {
                return build;
            }
        };
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).add(WindowedValue.valueInGlobalWindow("foo")).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        Executors.newSingleThreadExecutor().submit(new DirectTransformExecutor(this.evaluationContext, this.registry, Collections.emptyList(), commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        Assert.assertThat(this.completionCallback.handledResult, Matchers.is(Matchers.nullValue()));
        Assert.assertThat(this.completionCallback.handledException, Matchers.equalTo(exc));
    }

    @Test
    public void finishBundleThrowsExceptionCallsback() throws Exception {
        final Exception exc = new Exception();
        TransformEvaluator<String> transformEvaluator = new TransformEvaluator<String>() { // from class: org.apache.beam.runners.direct.DirectTransformExecutorTest.4
            public void processElement(WindowedValue<String> windowedValue) throws Exception {
            }

            public TransformResult<String> finishBundle() throws Exception {
                throw exc;
            }
        };
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        Executors.newSingleThreadExecutor().submit(new DirectTransformExecutor(this.evaluationContext, this.registry, Collections.emptyList(), commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState));
        this.evaluatorCompleted.await();
        Assert.assertThat(this.completionCallback.handledResult, Matchers.is(Matchers.nullValue()));
        Assert.assertThat(this.completionCallback.handledException, Matchers.equalTo(exc));
    }

    @Test
    public void callWithEnforcementAppliesEnforcement() throws Exception {
        final TransformResult build = StepTransformResult.withoutHold(this.downstreamProducer).build();
        TransformEvaluator<Object> transformEvaluator = new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.DirectTransformExecutorTest.5
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
            }

            public TransformResult<Object> finishBundle() throws Exception {
                return build;
            }
        };
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow("foo");
        WindowedValue valueInGlobalWindow2 = WindowedValue.valueInGlobalWindow("bar");
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).add(valueInGlobalWindow).add(valueInGlobalWindow2).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        TestEnforcementFactory testEnforcementFactory = new TestEnforcementFactory();
        new DirectTransformExecutor(this.evaluationContext, this.registry, Collections.singleton(testEnforcementFactory), commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState).run();
        TestEnforcement testEnforcement = testEnforcementFactory.instance;
        Assert.assertThat(testEnforcement.beforeElements, Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow2, valueInGlobalWindow}));
        Assert.assertThat(testEnforcement.afterElements, Matchers.containsInAnyOrder(new WindowedValue[]{valueInGlobalWindow2, valueInGlobalWindow}));
        Assert.assertThat(testEnforcement.finishedBundles, Matchers.contains(new TransformResult[]{build}));
    }

    @Test
    public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.createdProducer).build();
        TransformEvaluator<Object> transformEvaluator = new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.DirectTransformExecutorTest.6
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
            }

            public TransformResult<Object> finishBundle() throws Exception {
                return build;
            }
        };
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).add(WindowedValue.valueInGlobalWindow("foo")).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(new DirectTransformExecutor(this.evaluationContext, this.registry, Collections.singleton(new ThrowingEnforcementFactory(ThrowingEnforcementFactory.When.AFTER_BUNDLE)), commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState));
        this.thrown.expectCause(Matchers.isA(RuntimeException.class));
        this.thrown.expectMessage("afterFinish");
        submit.get();
    }

    @Test
    public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
        final StepTransformResult build = StepTransformResult.withoutHold(this.createdProducer).build();
        TransformEvaluator<Object> transformEvaluator = new TransformEvaluator<Object>() { // from class: org.apache.beam.runners.direct.DirectTransformExecutorTest.7
            public void processElement(WindowedValue<Object> windowedValue) throws Exception {
            }

            public TransformResult<Object> finishBundle() throws Exception {
                return build;
            }
        };
        CommittedBundle commit = this.bundleFactory.createBundle(this.created).add(WindowedValue.valueInGlobalWindow("foo")).commit(Instant.now());
        Mockito.when(this.registry.forApplication(this.downstreamProducer, commit)).thenReturn(transformEvaluator);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(new DirectTransformExecutor(this.evaluationContext, this.registry, Collections.singleton(new ThrowingEnforcementFactory(ThrowingEnforcementFactory.When.AFTER_ELEMENT)), commit, this.downstreamProducer, this.completionCallback, this.transformEvaluationState));
        this.thrown.expectCause(Matchers.isA(RuntimeException.class));
        this.thrown.expectMessage("afterElement");
        submit.get();
    }
}
