package org.apache.beam.runners.flink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.class */
public class FlinkStreamingPipelineTranslatorTest {
    @Test
    public void testAutoBalanceShardKeyResolvesMaxParallelism() {
        MatcherAssert.assertThat(Integer.valueOf(new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(3, -1, StringUtf8Coder.of()).getMaxParallelism()), Matchers.equalTo(Integer.valueOf(KeyGroupRangeAssignment.computeDefaultMaxParallelism(3))));
        MatcherAssert.assertThat(Integer.valueOf(new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(3, 0, StringUtf8Coder.of()).getMaxParallelism()), Matchers.equalTo(Integer.valueOf(KeyGroupRangeAssignment.computeDefaultMaxParallelism(3))));
    }

    @Test
    public void testAutoBalanceShardKeyCacheIsNotSerialized() throws Exception {
        FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction flinkAutoBalancedShardKeyShardingFunction = new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(2, 2, StringUtf8Coder.of());
        Assert.assertNull(flinkAutoBalancedShardKeyShardingFunction.getCache());
        flinkAutoBalancedShardKeyShardingFunction.assignShardKey("target/destination1", "one", 10);
        flinkAutoBalancedShardKeyShardingFunction.assignShardKey("target/destination2", "two", 10);
        MatcherAssert.assertThat(Integer.valueOf(flinkAutoBalancedShardKeyShardingFunction.getCache().size()), Matchers.equalTo(2));
        MatcherAssert.assertThat(SerializableUtils.clone(flinkAutoBalancedShardKeyShardingFunction).getCache(), Matchers.nullValue());
    }

    @Test
    public void testAutoBalanceShardKeyCacheIsStable() throws Exception {
        FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction flinkAutoBalancedShardKeyShardingFunction = new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(50 / 2, 50 * 2, StringUtf8Coder.of());
        ArrayList<KV> newArrayList = Lists.newArrayList();
        for (int i = 0; i < 50 * 100; i++) {
            newArrayList.add(KV.of("target/destination/1", UUID.randomUUID().toString()));
            newArrayList.add(KV.of("target/destination/2", UUID.randomUUID().toString()));
            newArrayList.add(KV.of("target/destination/3", UUID.randomUUID().toString()));
        }
        HashMap hashMap = new HashMap();
        for (KV kv : newArrayList) {
            ShardedKey assignShardKey = flinkAutoBalancedShardKeyShardingFunction.assignShardKey((String) kv.getKey(), (String) kv.getValue(), 50);
            hashMap.put(KV.of((String) kv.getKey(), Integer.valueOf(assignShardKey.getShardNumber())), assignShardKey);
        }
        FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction flinkAutoBalancedShardKeyShardingFunction2 = new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(50 / 2, 50 * 2, StringUtf8Coder.of());
        Collections.shuffle(newArrayList);
        for (KV kv2 : newArrayList) {
            ShardedKey assignShardKey2 = flinkAutoBalancedShardKeyShardingFunction2.assignShardKey((String) kv2.getKey(), (String) kv2.getValue(), 50);
            ShardedKey shardedKey = (ShardedKey) hashMap.get(KV.of((String) kv2.getKey(), Integer.valueOf(assignShardKey2.getShardNumber())));
            if (shardedKey != null) {
                MatcherAssert.assertThat(assignShardKey2, Matchers.equalTo(shardedKey));
            }
        }
    }

    @Test
    public void testAutoBalanceShardKeyCacheMaxSize() throws Exception {
        FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction flinkAutoBalancedShardKeyShardingFunction = new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(2, 2, StringUtf8Coder.of());
        for (int i = 0; i < 200; i++) {
            flinkAutoBalancedShardKeyShardingFunction.assignShardKey(UUID.randomUUID().toString(), "one", 2);
        }
        MatcherAssert.assertThat(Integer.valueOf(flinkAutoBalancedShardKeyShardingFunction.getCache().size()), Matchers.equalTo(100));
    }
}
