package org.apache.flink.cep.operator;

import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cep/operator/CEPMigration11to13Test.class */
public class CEPMigration11to13Test {

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigration11to13Test$EndFilter.class */
    private static class EndFilter extends SimpleCondition<Event> {
        private static final long serialVersionUID = 7056763917392056548L;

        private EndFilter() {
        }

        public boolean filter(Event event) throws Exception {
            return event.getName().equals("end");
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigration11to13Test$MiddleFilter.class */
    private static class MiddleFilter extends SimpleCondition<SubEvent> {
        private static final long serialVersionUID = 6215754202506583964L;

        private MiddleFilter() {
        }

        public boolean filter(SubEvent subEvent) throws Exception {
            return subEvent.getVolume() > 5.0d;
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigration11to13Test$NFAFactory.class */
    private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAFactory() {
            this(false);
        }

        private NFAFactory(boolean z) {
            this.handleTimeout = z;
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compile(Pattern.begin("start").where(new StartFilter()).followedBy("middle").subtype(SubEvent.class).where(new MiddleFilter()).followedBy("end").where(new EndFilter()).within(Time.milliseconds(10L)), Event.createTypeSerializer(), this.handleTimeout);
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPMigration11to13Test$StartFilter.class */
    private static class StartFilter extends SimpleCondition<Event> {
        private static final long serialVersionUID = 5726188262756267490L;

        private StartFilter() {
        }

        public boolean filter(Event event) throws Exception {
            return event.getName().equals("start");
        }
    }

    private static String getResourceFilename(String str) {
        URL resource = CEPMigration11to13Test.class.getClassLoader().getResource(str);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    @Test
    public void testKeyedCEPOperatorMigratation() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPMigration11to13Test.1
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo", 1.0d, 10.0d);
        Event event2 = new Event(42, "end", 1.0d);
        OneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, IntSerializer.INSTANCE, new NFAFactory(), true), keySelector, BasicTypeInfo.INT_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event2, 5L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue output = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(2L, output.size());
            Object poll = output.poll();
            Assert.assertTrue(poll instanceof StreamRecord);
            StreamRecord streamRecord = (StreamRecord) poll;
            Assert.assertTrue(streamRecord.getValue() instanceof Map);
            Map map = (Map) streamRecord.getValue();
            Assert.assertEquals(event, ((List) map.get("start")).get(0));
            Assert.assertEquals(subEvent, ((List) map.get("middle")).get(0));
            Assert.assertEquals(event2, ((List) map.get("end")).get(0));
            Event event3 = new Event(42, "start", 2.0d);
            SubEvent subEvent2 = new SubEvent(42, "foo", 1.0d, 11.0d);
            Event event4 = new Event(42, "end", 2.0d);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event3, 21L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent2, 23L));
            OperatorStateHandles snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(1L, 1L);
            keyedOneInputStreamOperatorTestHarness.close();
            keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, IntSerializer.INSTANCE, new NFAFactory(), true), keySelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.initializeState(snapshot);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event4, 25L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(50L));
            ConcurrentLinkedQueue output2 = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(2L, output2.size());
            Object poll2 = output2.poll();
            Assert.assertTrue(poll2 instanceof StreamRecord);
            StreamRecord streamRecord2 = (StreamRecord) poll2;
            Assert.assertTrue(streamRecord2.getValue() instanceof Map);
            Map map2 = (Map) streamRecord2.getValue();
            Assert.assertEquals(event3, ((List) map2.get("start")).get(0));
            Assert.assertEquals(subEvent2, ((List) map2.get("middle")).get(0));
            Assert.assertEquals(event4, ((List) map2.get("end")).get(0));
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testNonKeyedCEPFunctionMigration() throws Exception {
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo", 1.0d, 10.0d);
        Event event2 = new Event(42, "end", 1.0d);
        NullByteKeySelector nullByteKeySelector = new NullByteKeySelector();
        OneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, ByteSerializer.INSTANCE, new NFAFactory(), false), nullByteKeySelector, BasicTypeInfo.BYTE_TYPE_INFO);
        try {
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event2, 5L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue output = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(2L, output.size());
            Object poll = output.poll();
            Assert.assertTrue(poll instanceof StreamRecord);
            StreamRecord streamRecord = (StreamRecord) poll;
            Assert.assertTrue(streamRecord.getValue() instanceof Map);
            Map map = (Map) streamRecord.getValue();
            Assert.assertEquals(event, ((List) map.get("start")).get(0));
            Assert.assertEquals(subEvent, ((List) map.get("middle")).get(0));
            Assert.assertEquals(event2, ((List) map.get("end")).get(0));
            Event event3 = new Event(42, "start", 2.0d);
            SubEvent subEvent2 = new SubEvent(42, "foo", 1.0d, 11.0d);
            Event event4 = new Event(42, "end", 2.0d);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event3, 21L));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent2, 23L));
            OperatorStateHandles snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(1L, 1L);
            keyedOneInputStreamOperatorTestHarness.close();
            keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, ByteSerializer.INSTANCE, new NFAFactory(), false), nullByteKeySelector, BasicTypeInfo.BYTE_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.setup();
            keyedOneInputStreamOperatorTestHarness.initializeState(snapshot);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event4, 25L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(50L));
            ConcurrentLinkedQueue output2 = keyedOneInputStreamOperatorTestHarness.getOutput();
            Assert.assertEquals(2L, output2.size());
            Object poll2 = output2.poll();
            Assert.assertTrue(poll2 instanceof StreamRecord);
            StreamRecord streamRecord2 = (StreamRecord) poll2;
            Assert.assertTrue(streamRecord2.getValue() instanceof Map);
            Map map2 = (Map) streamRecord2.getValue();
            Assert.assertEquals(event3, ((List) map2.get("start")).get(0));
            Assert.assertEquals(subEvent2, ((List) map2.get("middle")).get(0));
            Assert.assertEquals(event4, ((List) map2.get("end")).get(0));
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }
}
