package org.apache.beam.runners.direct;

import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaceForTest;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.class */
public class CopyOnAccessInMemoryStateInternalsTest {

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private String key = "foo";

    @Test
    public void testGetWithEmpty() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag bag = StateTags.bag("foo", StringUtf8Coder.of());
        BagState state = withUnderlying.state(stateNamespaceForTest, bag);
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("bar");
        state.add("baz");
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"baz", "bar"}));
        Assert.assertThat(withUnderlying.state(stateNamespaceForTest, bag).read(), Matchers.containsInAnyOrder(new String[]{"baz", "bar"}));
    }

    @Test
    public void testGetWithAbsentInUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        CopyOnAccessInMemoryStateInternals withUnderlying2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag bag = StateTags.bag("foo", StringUtf8Coder.of());
        BagState state = withUnderlying2.state(stateNamespaceForTest, bag);
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("bar");
        state.add("baz");
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"baz", "bar"}));
        Assert.assertThat(withUnderlying2.state(stateNamespaceForTest, bag).read(), Matchers.containsInAnyOrder(new String[]{"baz", "bar"}));
        Assert.assertThat(withUnderlying.state(stateNamespaceForTest, bag).read(), Matchers.emptyIterable());
    }

    @Test
    public void testGetWithPresentInUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag value = StateTags.value("foo", StringUtf8Coder.of());
        ValueState state = withUnderlying.state(stateNamespaceForTest, value);
        Assert.assertThat((String) state.read(), Matchers.nullValue(String.class));
        state.write("bar");
        Assert.assertThat((String) state.read(), Matchers.equalTo("bar"));
        ValueState state2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying).state(stateNamespaceForTest, value);
        Assert.assertThat((String) state2.read(), Matchers.equalTo("bar"));
        state2.write("baz");
        Assert.assertThat((String) state2.read(), Matchers.equalTo("baz"));
        Assert.assertThat((String) state.read(), Matchers.equalTo("bar"));
        Assert.assertThat((String) state.read(), Matchers.equalTo((String) withUnderlying.state(stateNamespaceForTest, value).read()));
    }

    @Test
    public void testBagStateWithUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag bag = StateTags.bag("foo", VarIntCoder.of());
        BagState state = withUnderlying.state(stateNamespaceForTest, bag);
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add(1);
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new Integer[]{1}));
        BagState state2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying).state(stateNamespaceForTest, bag);
        Assert.assertThat(state2.read(), Matchers.containsInAnyOrder(new Integer[]{1}));
        state2.add(4);
        Assert.assertThat(state2.read(), Matchers.containsInAnyOrder(new Integer[]{4, 1}));
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new Integer[]{1}));
        Assert.assertThat(Lists.newArrayList(state.read()), Matchers.equalTo(Lists.newArrayList(withUnderlying.state(stateNamespaceForTest, bag).read())));
    }

    @Test
    public void testSetStateWithUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag stateTag = StateTags.set("foo", VarIntCoder.of());
        SetState state = withUnderlying.state(stateNamespaceForTest, stateTag);
        Assert.assertThat((Iterable) state.read(), Matchers.emptyIterable());
        state.add(1);
        Assert.assertThat((Iterable) state.read(), Matchers.containsInAnyOrder(new Integer[]{1}));
        SetState state2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying).state(stateNamespaceForTest, stateTag);
        Assert.assertThat((Iterable) state2.read(), Matchers.containsInAnyOrder(new Integer[]{1}));
        state2.add(4);
        Assert.assertThat((Iterable) state2.read(), Matchers.containsInAnyOrder(new Integer[]{4, 1}));
        Assert.assertThat((Iterable) state.read(), Matchers.containsInAnyOrder(new Integer[]{1}));
        Assert.assertThat((Iterable) state.read(), Matchers.equalTo((Iterable) withUnderlying.state(stateNamespaceForTest, stateTag).read()));
    }

    @Test
    public void testMapStateWithUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag map = StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
        MapState state = withUnderlying.state(stateNamespaceForTest, map);
        Assert.assertThat((Iterable) state.entries().read(), Matchers.emptyIterable());
        state.put("hello", 1);
        Assert.assertThat((Integer) state.get("hello").read(), Matchers.equalTo(1));
        MapState state2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying).state(stateNamespaceForTest, map);
        Assert.assertThat((Integer) state2.get("hello").read(), Matchers.equalTo(1));
        state2.put("world", 4);
        Assert.assertThat((Integer) state2.get("hello").read(), Matchers.equalTo(1));
        Assert.assertThat((Integer) state2.get("world").read(), Matchers.equalTo(4));
        Assert.assertThat((Integer) state.get("hello").read(), Matchers.equalTo(1));
        Assert.assertNull(state.get("world").read());
        Assert.assertThat((Iterable) state.entries().read(), Matchers.equalTo((Iterable) withUnderlying.state(stateNamespaceForTest, map).entries().read()));
    }

    @Test
    public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        Combine.BinaryCombineLongFn ofLongs = Sum.ofLongs();
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        CoderRegistry coderRegistry = this.pipeline.getCoderRegistry();
        StateTag combiningValue = StateTags.combiningValue("summer", ofLongs.getAccumulatorCoder(coderRegistry, coderRegistry.getCoder(Long.class)), ofLongs);
        GroupingState state = withUnderlying.state(stateNamespaceForTest, combiningValue);
        Assert.assertThat((Long) state.read(), Matchers.equalTo(0L));
        state.add(1L);
        Assert.assertThat((Long) state.read(), Matchers.equalTo(1L));
        GroupingState state2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying).state(stateNamespaceForTest, combiningValue);
        Assert.assertThat((Long) state2.read(), Matchers.equalTo(1L));
        state2.add(4L);
        Assert.assertThat((Long) state2.read(), Matchers.equalTo(5L));
        Assert.assertThat((Long) state.read(), Matchers.equalTo(1L));
        Assert.assertThat((Long) state.read(), Matchers.equalTo((Long) withUnderlying.state(stateNamespaceForTest, combiningValue).read()));
    }

    @Test
    public void testWatermarkHoldStateWithUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag<WatermarkHoldState> watermarkStateInternal = StateTags.watermarkStateInternal("wmstate", timestampCombiner);
        WatermarkHoldState state = withUnderlying.state(stateNamespaceForTest, watermarkStateInternal);
        Assert.assertThat((Instant) state.read(), Matchers.nullValue());
        state.add(new Instant(250L));
        Assert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(250L)));
        WatermarkHoldState state2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying).state(stateNamespaceForTest, watermarkStateInternal);
        Assert.assertThat((Instant) state2.read(), Matchers.equalTo(new Instant(250L)));
        state2.add(new Instant(100L));
        Assert.assertThat((Instant) state2.read(), Matchers.equalTo(new Instant(100L)));
        Assert.assertThat((Instant) state.read(), Matchers.equalTo(new Instant(250L)));
        state2.add(new Instant(500L));
        Assert.assertThat((Instant) state2.read(), Matchers.equalTo(new Instant(100L)));
        Assert.assertThat((Instant) state.read(), Matchers.equalTo((Instant) withUnderlying.state(stateNamespaceForTest, watermarkStateInternal).read()));
    }

    @Test
    public void testCommitWithoutUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag bag = StateTags.bag("foo", StringUtf8Coder.of());
        BagState state = withUnderlying.state(stateNamespaceForTest, bag);
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("bar");
        state.add("baz");
        Assert.assertThat(state.read(), Matchers.containsInAnyOrder(new String[]{"baz", "bar"}));
        withUnderlying.commit();
        Assert.assertThat(withUnderlying.state(stateNamespaceForTest, bag).read(), Matchers.containsInAnyOrder(new String[]{"baz", "bar"}));
        Assert.assertThat(Boolean.valueOf(withUnderlying.isEmpty()), Matchers.is(false));
    }

    @Test
    public void testCommitWithUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        CopyOnAccessInMemoryStateInternals withUnderlying2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag bag = StateTags.bag("foo", StringUtf8Coder.of());
        BagState state = withUnderlying.state(stateNamespaceForTest, bag);
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("bar");
        state.add("baz");
        withUnderlying2.commit();
        BagState state2 = withUnderlying2.state(stateNamespaceForTest, bag);
        Assert.assertThat(state2.read(), Matchers.containsInAnyOrder(new String[]{"baz", "bar"}));
        state2.add("spam");
        BagState state3 = withUnderlying.state(stateNamespaceForTest, bag);
        Assert.assertThat(state3.read(), Matchers.containsInAnyOrder(new String[]{"spam", "bar", "baz"}));
        Assert.assertThat(state3, Matchers.is(Matchers.theInstance(state)));
        Assert.assertThat(Boolean.valueOf(withUnderlying2.isEmpty()), Matchers.is(false));
    }

    @Test
    public void testCommitWithClearedInUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        CopyOnAccessInMemoryStateInternals copyOnAccessInMemoryStateInternals = (CopyOnAccessInMemoryStateInternals) Mockito.spy(CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying));
        CopyOnAccessInMemoryStateInternals withUnderlying2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, copyOnAccessInMemoryStateInternals);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag bag = StateTags.bag("foo", StringUtf8Coder.of());
        BagState state = withUnderlying.state(stateNamespaceForTest, bag);
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("bar");
        state.add("baz");
        state.clear();
        copyOnAccessInMemoryStateInternals.commit();
        state.add("foo");
        withUnderlying2.commit();
        Assert.assertThat(withUnderlying2.state(stateNamespaceForTest, bag).read(), Matchers.emptyIterable());
        ((CopyOnAccessInMemoryStateInternals) Mockito.verify(copyOnAccessInMemoryStateInternals, Mockito.never())).state(stateNamespaceForTest, bag);
        Assert.assertThat(Boolean.valueOf(withUnderlying2.isEmpty()), Matchers.is(false));
    }

    @Test
    public void testCommitWithOverwrittenUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        CopyOnAccessInMemoryStateInternals withUnderlying2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying);
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag bag = StateTags.bag("foo", StringUtf8Coder.of());
        BagState state = withUnderlying.state(stateNamespaceForTest, bag);
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("bar");
        state.add("baz");
        BagState state2 = withUnderlying2.state(stateNamespaceForTest, bag);
        state2.add("eggs");
        state2.add("ham");
        state2.add("0x00ff00");
        state2.add("&");
        withUnderlying2.commit();
        Assert.assertThat(withUnderlying2.state(stateNamespaceForTest, bag).read(), Matchers.containsInAnyOrder(new String[]{"bar", "baz", "0x00ff00", "eggs", "&", "ham"}));
        Assert.assertThat(withUnderlying.state(stateNamespaceForTest, bag).read(), Matchers.containsInAnyOrder(new String[]{"bar", "baz"}));
    }

    @Test
    public void testCommitWithAddedUnderlying() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        CopyOnAccessInMemoryStateInternals withUnderlying2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying);
        withUnderlying2.commit();
        StateNamespaceForTest stateNamespaceForTest = new StateNamespaceForTest("foo");
        StateTag bag = StateTags.bag("foo", StringUtf8Coder.of());
        BagState state = withUnderlying.state(stateNamespaceForTest, bag);
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("bar");
        state.add("baz");
        Assert.assertThat(withUnderlying2.state(stateNamespaceForTest, bag).read(), Matchers.emptyIterable());
        Assert.assertThat(withUnderlying.state(stateNamespaceForTest, bag).read(), Matchers.containsInAnyOrder(new String[]{"bar", "baz"}));
    }

    @Test
    public void testCommitWithEmptyTableIsEmpty() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        withUnderlying.commit();
        Assert.assertThat(Boolean.valueOf(withUnderlying.isEmpty()), Matchers.is(true));
    }

    @Test
    public void testCommitWithOnlyClearedValuesIsEmpty() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        BagState state = withUnderlying.state(new StateNamespaceForTest("foo"), StateTags.bag("foo", StringUtf8Coder.of()));
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("foo");
        state.clear();
        withUnderlying.commit();
        Assert.assertThat(Boolean.valueOf(withUnderlying.isEmpty()), Matchers.is(true));
    }

    @Test
    public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        CopyOnAccessInMemoryStateInternals withUnderlying2 = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, withUnderlying);
        BagState state = withUnderlying.state(new StateNamespaceForTest("foo"), StateTags.bag("foo", StringUtf8Coder.of()));
        Assert.assertThat(state.read(), Matchers.emptyIterable());
        state.add("bar");
        state.add("baz");
        withUnderlying2.commit();
        Assert.assertThat(Boolean.valueOf(withUnderlying2.isEmpty()), Matchers.is(false));
    }

    @Test
    public void testGetEarliestWatermarkHoldAfterCommit() {
        BoundedWindow boundedWindow = new BoundedWindow() { // from class: org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternalsTest.1
            public Instant maxTimestamp() {
                return new Instant(2048L);
            }
        };
        BoundedWindow boundedWindow2 = new BoundedWindow() { // from class: org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternalsTest.2
            public Instant maxTimestamp() {
                return new Instant(689743L);
            }
        };
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", (CopyOnAccessInMemoryStateInternals) null);
        withUnderlying.state(StateNamespaces.window(null, boundedWindow), StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)).add(new Instant(22L));
        withUnderlying.state(StateNamespaces.window(null, boundedWindow2), StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)).add(new Instant(2L));
        withUnderlying.commit();
        Assert.assertThat(withUnderlying.getEarliestWatermarkHold(), Matchers.equalTo(new Instant(2L)));
    }

    @Test
    public void testGetEarliestWatermarkHoldWithEarliestInUnderlyingTable() {
        BoundedWindow boundedWindow = new BoundedWindow() { // from class: org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternalsTest.3
            public Instant maxTimestamp() {
                return new Instant(2048L);
            }
        };
        BoundedWindow boundedWindow2 = new BoundedWindow() { // from class: org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternalsTest.4
            public Instant maxTimestamp() {
                return new Instant(689743L);
            }
        };
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", (CopyOnAccessInMemoryStateInternals) null);
        withUnderlying.state(StateNamespaces.window(null, boundedWindow), StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)).add(new Instant(22L));
        CopyOnAccessInMemoryStateInternals withUnderlying2 = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", withUnderlying.commit());
        withUnderlying2.state(StateNamespaces.window(null, boundedWindow2), StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)).add(new Instant(244L));
        withUnderlying2.commit();
        Assert.assertThat(withUnderlying2.getEarliestWatermarkHold(), Matchers.equalTo(new Instant(22L)));
    }

    @Test
    public void testGetEarliestWatermarkHoldWithEarliestInNewTable() {
        BoundedWindow boundedWindow = new BoundedWindow() { // from class: org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternalsTest.5
            public Instant maxTimestamp() {
                return new Instant(2048L);
            }
        };
        BoundedWindow boundedWindow2 = new BoundedWindow() { // from class: org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternalsTest.6
            public Instant maxTimestamp() {
                return new Instant(689743L);
            }
        };
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", (CopyOnAccessInMemoryStateInternals) null);
        withUnderlying.state(StateNamespaces.window(null, boundedWindow), StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)).add(new Instant(224L));
        CopyOnAccessInMemoryStateInternals withUnderlying2 = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", withUnderlying.commit());
        withUnderlying2.state(StateNamespaces.window(null, boundedWindow2), StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)).add(new Instant(24L));
        withUnderlying2.commit();
        Assert.assertThat(withUnderlying2.getEarliestWatermarkHold(), Matchers.equalTo(new Instant(24L)));
    }

    @Test
    public void testGetEarliestHoldBeforeCommit() {
        CopyOnAccessInMemoryStateInternals withUnderlying = CopyOnAccessInMemoryStateInternals.withUnderlying(this.key, (CopyOnAccessInMemoryStateInternals) null);
        withUnderlying.state(StateNamespaces.global(), StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)).add(new Instant(1234L));
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage(CopyOnAccessInMemoryStateInternals.class.getSimpleName());
        this.thrown.expectMessage("Can't get the earliest watermark hold");
        this.thrown.expectMessage("before it is committed");
        withUnderlying.getEarliestWatermarkHold();
    }
}
