package org.apache.beam.runners.flink.metrics;

import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.class */
public class FlinkMetricContainerTest {

    @Mock
    private RuntimeContext runtimeContext;

    @Mock
    private MetricGroup metricGroup;

    @Before
    public void beforeTest() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.runtimeContext.getAccumulator(Matchers.anyString())).thenReturn(new MetricsAccumulator());
        Mockito.when(this.runtimeContext.getMetricGroup()).thenReturn(this.metricGroup);
    }

    @Test
    public void testMetricNameGeneration() {
        MatcherAssert.assertThat(FlinkMetricContainer.getFlinkMetricNameString(MetricKey.create("step", MetricName.named("namespace", "name"))), CoreMatchers.is("namespace.name"));
    }

    @Test
    public void testCounter() {
        SimpleCounter simpleCounter = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("namespace.name")).thenReturn(simpleCounter);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        Counter counter = flinkMetricContainer.getMetricsContainer("step").getCounter(MetricName.named("namespace", "name"));
        counter.inc();
        counter.inc();
        MatcherAssert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(0L));
        flinkMetricContainer.updateMetrics("step");
        MatcherAssert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(2L));
    }

    @Test
    public void testGauge() {
        FlinkMetricContainer.FlinkGauge flinkGauge = new FlinkMetricContainer.FlinkGauge(GaugeResult.empty());
        Mockito.when(this.metricGroup.gauge((String) Matchers.eq("namespace.name"), (Gauge) Matchers.anyObject())).thenReturn(flinkGauge);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        org.apache.beam.sdk.metrics.Gauge gauge = flinkMetricContainer.getMetricsContainer("step").getGauge(MetricName.named("namespace", "name"));
        MatcherAssert.assertThat(flinkGauge.getValue(), CoreMatchers.is(GaugeResult.empty()));
        flinkMetricContainer.updateMetrics("step");
        gauge.set(1L);
        gauge.set(42L);
        flinkMetricContainer.updateMetrics("step");
        MatcherAssert.assertThat(Long.valueOf(flinkGauge.getValue().getValue()), CoreMatchers.is(42L));
    }

    @Test
    public void testMonitoringInfoUpdate() {
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        SimpleCounter simpleCounter = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("ns1.metric1")).thenReturn(simpleCounter);
        SimpleCounter simpleCounter2 = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("pcoll.metric:element_count:v1")).thenReturn(simpleCounter2);
        SimpleCounter simpleCounter3 = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("anyPTransform.myMetric")).thenReturn(simpleCounter3);
        MetricsApi.MonitoringInfo build = new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.USER_COUNTER).setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns1").setLabel(MonitoringInfoConstants.Labels.NAME, "metric1").setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform").setInt64Value(111L).build();
        Assert.assertNotNull(build);
        MetricsApi.MonitoringInfo build2 = new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT).setInt64Value(222L).setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pcoll").setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform").build();
        Assert.assertNotNull(build2);
        MetricsApi.MonitoringInfo build3 = new SimpleMonitoringInfoBuilder().setUrn(MonitoringInfoConstants.Urns.START_BUNDLE_MSECS).setInt64Value(333L).setLabel(MonitoringInfoConstants.Labels.NAME, "myMetric").setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "anyPTransform").build();
        Assert.assertNotNull(build3);
        MatcherAssert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(0L));
        MatcherAssert.assertThat(Long.valueOf(simpleCounter2.getCount()), CoreMatchers.is(0L));
        MatcherAssert.assertThat(Long.valueOf(simpleCounter3.getCount()), CoreMatchers.is(0L));
        flinkMetricContainer.updateMetrics("step", ImmutableList.of(build, build2, build3));
        MatcherAssert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(111L));
        MatcherAssert.assertThat(Long.valueOf(simpleCounter2.getCount()), CoreMatchers.is(222L));
        MatcherAssert.assertThat(Long.valueOf(simpleCounter3.getCount()), CoreMatchers.is(333L));
    }

    @Test
    public void testDropUnexpectedMonitoringInfoTypes() {
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        MetricsContainerImpl metricsContainer = flinkMetricContainer.getMetricsContainer("step");
        MetricsApi.MonitoringInfo build = MetricsApi.MonitoringInfo.newBuilder().setUrn(MonitoringInfoConstants.Urns.USER_COUNTER).putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns1").putLabels(MonitoringInfoConstants.Labels.NAME, "int_counter").putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step").setMetric(MetricsApi.Metric.newBuilder().setCounterData(MetricsApi.CounterData.newBuilder().setInt64Value(111L))).build();
        MetricsApi.MonitoringInfo build2 = MetricsApi.MonitoringInfo.newBuilder().setUrn(MonitoringInfoConstants.Urns.USER_COUNTER).putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns2").putLabels(MonitoringInfoConstants.Labels.NAME, "double_counter").putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step").setMetric(MetricsApi.Metric.newBuilder().setCounterData(MetricsApi.CounterData.newBuilder().setDoubleValue(222.0d))).build();
        MetricsApi.MonitoringInfo build3 = MetricsApi.MonitoringInfo.newBuilder().setUrn(MonitoringInfoConstants.Urns.USER_COUNTER).putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns3").putLabels(MonitoringInfoConstants.Labels.NAME, "int_distribution").putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step").setMetric(MetricsApi.Metric.newBuilder().setDistributionData(MetricsApi.DistributionData.newBuilder().setIntDistributionData(MetricsApi.IntDistributionData.newBuilder().setSum(30L).setCount(10L).setMin(1L).setMax(5L)))).build();
        MetricsApi.MonitoringInfo build4 = MetricsApi.MonitoringInfo.newBuilder().setUrn(MonitoringInfoConstants.Urns.USER_COUNTER).putLabels(MonitoringInfoConstants.Labels.NAMESPACE, "ns4").putLabels(MonitoringInfoConstants.Labels.NAME, "double_distribution").putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, "step").setMetric(MetricsApi.Metric.newBuilder().setDistributionData(MetricsApi.DistributionData.newBuilder().setDoubleDistributionData(MetricsApi.DoubleDistributionData.newBuilder().setSum(30.0d).setCount(10L).setMin(1.0d).setMax(5.0d)))).build();
        SimpleCounter simpleCounter = new SimpleCounter();
        Mockito.when(this.metricGroup.counter("ns1.int_counter")).thenReturn(simpleCounter);
        flinkMetricContainer.updateMetrics("step", ImmutableList.of(build, build2, build3, build4));
        ((MetricGroup) Mockito.verify(this.metricGroup)).counter((String) Matchers.eq("ns1.int_counter"));
        MatcherAssert.assertThat(Long.valueOf(simpleCounter.getCount()), CoreMatchers.is(111L));
        MatcherAssert.assertThat(Long.valueOf(metricsContainer.tryGetCounter(MonitoringInfoMetricName.of(build)).getCumulative().longValue()), CoreMatchers.is(111L));
        ((MetricGroup) Mockito.verify(this.metricGroup)).gauge((String) Matchers.eq("ns3.int_distribution"), (FlinkMetricContainer.FlinkDistributionGauge) Matchers.argThat(new ArgumentMatcher<FlinkMetricContainer.FlinkDistributionGauge>() { // from class: org.apache.beam.runners.flink.metrics.FlinkMetricContainerTest.1
            public boolean matches(FlinkMetricContainer.FlinkDistributionGauge flinkDistributionGauge) {
                return flinkDistributionGauge.getValue().equals(DistributionResult.create(30L, 10L, 1L, 5L));
            }
        }));
        MatcherAssert.assertThat(metricsContainer.getDistribution(MonitoringInfoMetricName.of(build3)).getCumulative(), CoreMatchers.is(DistributionData.create(30L, 10L, 1L, 5L)));
    }

    @Test
    public void testDistribution() {
        FlinkMetricContainer.FlinkDistributionGauge flinkDistributionGauge = new FlinkMetricContainer.FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT);
        Mockito.when(this.metricGroup.gauge((String) Matchers.eq("namespace.name"), (Gauge) Matchers.anyObject())).thenReturn(flinkDistributionGauge);
        FlinkMetricContainer flinkMetricContainer = new FlinkMetricContainer(this.runtimeContext);
        Distribution distribution = flinkMetricContainer.getMetricsContainer("step").getDistribution(MetricName.named("namespace", "name"));
        MatcherAssert.assertThat(flinkDistributionGauge.getValue(), CoreMatchers.is(DistributionResult.IDENTITY_ELEMENT));
        flinkMetricContainer.updateMetrics("step");
        distribution.update(42L);
        distribution.update(-23L);
        distribution.update(0L);
        distribution.update(1L);
        flinkMetricContainer.updateMetrics("step");
        MatcherAssert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getMax()), CoreMatchers.is(42L));
        MatcherAssert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getMin()), CoreMatchers.is(-23L));
        MatcherAssert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getCount()), CoreMatchers.is(4L));
        MatcherAssert.assertThat(Long.valueOf(flinkDistributionGauge.getValue().getSum()), CoreMatchers.is(20L));
        MatcherAssert.assertThat(Double.valueOf(flinkDistributionGauge.getValue().getMean()), CoreMatchers.is(Double.valueOf(5.0d)));
    }
}
