package org.apache.beam.sdk.fn.data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataTimeBasedBufferingOutboundObserverTest.class */
public class BeamFnDataTimeBasedBufferingOutboundObserverTest {
    private static final LogicalEndpoint OUTPUT_LOCATION = LogicalEndpoint.of("777L", "555L");
    private static final Coder<WindowedValue<byte[]>> CODER = LengthPrefixCoder.of(WindowedValue.getValueOnlyCoder(ByteArrayCoder.of()));

    /* JADX WARN: Type inference failed for: r0v11, types: [byte[], byte[][]] */
    @Test
    public void testConfiguredTimeLimit() throws Exception {
        ArrayList arrayList = new ArrayList();
        PipelineOptions create = PipelineOptionsFactory.create();
        create.as(ExperimentalOptions.class).setExperiments(Arrays.asList("beam_fn_api_data_buffer_time_limit=1"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        BeamFnDataBufferingOutboundObserver.forLocation(create, OUTPUT_LOCATION, CODER, TestStreams.withOnNext(elements -> {
            arrayList.add(elements);
            countDownLatch.countDown();
        }).build()).accept(WindowedValue.valueInGlobalWindow(new byte[1]));
        countDownLatch.await();
        Assert.assertEquals(BeamFnDataSizeBasedBufferingOutboundObserverTest.messageWithData(new byte[]{new byte[1]}), Iterables.get(arrayList, 0));
    }

    @Test
    public void testConfiguredTimeLimitExceptionPropagation() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.as(ExperimentalOptions.class).setExperiments(Arrays.asList("beam_fn_api_data_buffer_time_limit=1"));
        BeamFnDataTimeBasedBufferingOutboundObserver forLocation = BeamFnDataBufferingOutboundObserver.forLocation(create, OUTPUT_LOCATION, CODER, TestStreams.withOnNext(elements -> {
            throw new RuntimeException("");
        }).build());
        forLocation.accept(WindowedValue.valueInGlobalWindow(new byte[1]));
        while (!forLocation.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            forLocation.accept(WindowedValue.valueInGlobalWindow(new byte[1]));
            Assert.fail();
        } catch (Exception e) {
        }
        BeamFnDataTimeBasedBufferingOutboundObserver forLocation2 = BeamFnDataBufferingOutboundObserver.forLocation(create, OUTPUT_LOCATION, CODER, TestStreams.withOnNext(elements2 -> {
            throw new RuntimeException("");
        }).build());
        forLocation2.accept(WindowedValue.valueInGlobalWindow(new byte[1]));
        while (!forLocation2.flushFuture.isDone()) {
            Thread.sleep(1L);
        }
        try {
            forLocation2.close();
            Assert.fail();
        } catch (Exception e2) {
        }
    }
}
