package org.apache.flink.runtime.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.dispatch.OnComplete;
import com.typesafe.config.ConfigFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/akka/QuarantineMonitorTest.class */
public class QuarantineMonitorTest extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(QuarantineMonitorTest.class);
    private static final FiniteDuration zeroDelay = new FiniteDuration(0, TimeUnit.SECONDS);
    private static ActorSystem actorSystem1;
    private ActorSystem actorSystem2;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/akka/QuarantineMonitorTest$ErrorHandler.class */
    public interface ErrorHandler {
        void handleError(Throwable th);
    }

    /* loaded from: input_file:org/apache/flink/runtime/akka/QuarantineMonitorTest$Ping.class */
    static class Ping {
        private final String target;

        Ping(String str) {
            this.target = str;
        }

        public String getTarget() {
            return this.target;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/akka/QuarantineMonitorTest$TestingQuarantineHandler.class */
    private static class TestingQuarantineHandler implements QuarantineHandler, ErrorHandler {
        private final CompletableFuture<String> wasQuarantinedByFuture = new FlinkCompletableFuture();
        private final CompletableFuture<String> hasQuarantinedFuture = new FlinkCompletableFuture();

        public void wasQuarantinedBy(String str, ActorSystem actorSystem) {
            this.wasQuarantinedByFuture.complete(str);
        }

        public void hasQuarantined(String str, ActorSystem actorSystem) {
            this.hasQuarantinedFuture.complete(str);
        }

        public Future<String> getWasQuarantinedByFuture() {
            return this.wasQuarantinedByFuture;
        }

        public Future<String> getHasQuarantinedFuture() {
            return this.hasQuarantinedFuture;
        }

        @Override // org.apache.flink.runtime.akka.QuarantineMonitorTest.ErrorHandler
        public void handleError(Throwable th) {
            this.wasQuarantinedByFuture.completeExceptionally(th);
            this.hasQuarantinedFuture.completeExceptionally(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/akka/QuarantineMonitorTest$Watch.class */
    static class Watch {
        private final String target;

        Watch(String str) {
            this.target = str;
        }

        public String getTarget() {
            return this.target;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/akka/QuarantineMonitorTest$Watchee.class */
    public static class Watchee extends UntypedActor {
        private final FiniteDuration timeout;
        private final FiniteDuration interval;
        private final ErrorHandler errorHandler;

        Watchee(FiniteDuration finiteDuration, FiniteDuration finiteDuration2, ErrorHandler errorHandler) {
            this.timeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration);
            this.interval = (FiniteDuration) Preconditions.checkNotNull(finiteDuration2);
            this.errorHandler = (ErrorHandler) Preconditions.checkNotNull(errorHandler);
        }

        public void onReceive(Object obj) throws Exception {
            if (obj instanceof Ping) {
                getContext().actorSelection(((Ping) obj).getTarget()).resolveOne(this.timeout).onComplete(new OnComplete<ActorRef>() { // from class: org.apache.flink.runtime.akka.QuarantineMonitorTest.Watchee.1
                    public void onComplete(Throwable th, ActorRef actorRef) throws Throwable {
                        if (actorRef != null) {
                            Watchee.this.getContext().system().scheduler().schedule(QuarantineMonitorTest.zeroDelay, Watchee.this.interval, actorRef, "Watchee message", Watchee.this.getContext().dispatcher(), Watchee.this.getSelf());
                        } else {
                            Watchee.this.errorHandler.handleError(th);
                        }
                    }
                }, getContext().dispatcher());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/akka/QuarantineMonitorTest$Watcher.class */
    public static class Watcher extends UntypedActor {
        private final FiniteDuration timeout;
        private final FiniteDuration interval;
        private final ErrorHandler errorHandler;

        Watcher(FiniteDuration finiteDuration, FiniteDuration finiteDuration2, ErrorHandler errorHandler) {
            this.timeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration);
            this.interval = (FiniteDuration) Preconditions.checkNotNull(finiteDuration2);
            this.errorHandler = (ErrorHandler) Preconditions.checkNotNull(errorHandler);
        }

        public void onReceive(Object obj) throws Exception {
            if (obj instanceof Watch) {
                getContext().actorSelection(((Watch) obj).getTarget()).resolveOne(this.timeout).onComplete(new OnComplete<ActorRef>() { // from class: org.apache.flink.runtime.akka.QuarantineMonitorTest.Watcher.1
                    public void onComplete(Throwable th, ActorRef actorRef) throws Throwable {
                        if (actorRef == null) {
                            Watcher.this.errorHandler.handleError(th);
                        } else {
                            Watcher.this.getContext().watch(actorRef);
                            Watcher.this.getContext().system().scheduler().schedule(QuarantineMonitorTest.zeroDelay, Watcher.this.interval, actorRef, "Watcher message", Watcher.this.getContext().dispatcher(), Watcher.this.getSelf());
                        }
                    }
                }, getContext().dispatcher());
            }
        }
    }

    @BeforeClass
    public static void setup() {
        Properties properties = new Properties();
        properties.setProperty("akka.remote.watch-failure-detector.threshold", "0.00001");
        properties.setProperty("akka.remote.watch-failure-detector.heartbeat-interval", "1 ms");
        properties.setProperty("akka.remote.watch-failure-detector.acceptable-heartbeat-pause", "1 ms");
        actorSystem1 = AkkaUtils.createActorSystem(ConfigFactory.parseProperties(properties).withFallback(AkkaUtils.getDefaultAkkaConfig()));
    }

    @AfterClass
    public static void tearDown() {
        if (actorSystem1 != null) {
            actorSystem1.shutdown();
            actorSystem1.awaitTermination();
        }
    }

    @Before
    public void setupTest() {
        this.actorSystem2 = AkkaUtils.createDefaultActorSystem();
    }

    @After
    public void tearDownTest() {
        if (this.actorSystem2 != null) {
            this.actorSystem2.shutdown();
            this.actorSystem2.awaitTermination();
        }
    }

    @Test(timeout = 5000)
    public void testWatcheeQuarantined() throws ExecutionException, InterruptedException {
        TestingQuarantineHandler testingQuarantineHandler = new TestingQuarantineHandler();
        ActorRef actorRef = null;
        ActorRef actorRef2 = null;
        ActorRef actorRef3 = null;
        FiniteDuration finiteDuration = new FiniteDuration(5L, TimeUnit.SECONDS);
        FiniteDuration finiteDuration2 = new FiniteDuration(200L, TimeUnit.MILLISECONDS);
        try {
            actorRef3 = this.actorSystem2.actorOf(getQuarantineMonitorProps(testingQuarantineHandler), "quarantineMonitor");
            actorRef = this.actorSystem2.actorOf(getWatcheeProps(finiteDuration, finiteDuration2, testingQuarantineHandler), "watchee");
            actorRef2 = actorSystem1.actorOf(getWatcherProps(finiteDuration, finiteDuration2, testingQuarantineHandler), "watcher");
            Address address = AkkaUtils.getAddress(actorSystem1);
            String akkaURL = AkkaUtils.getAkkaURL(this.actorSystem2, actorRef);
            actorRef.tell(new Ping(AkkaUtils.getAkkaURL(actorSystem1, actorRef2)), ActorRef.noSender());
            actorRef2.tell(new Watch(akkaURL), ActorRef.noSender());
            Assert.assertEquals(address.toString(), testingQuarantineHandler.getWasQuarantinedByFuture().get());
            TestingUtils.stopActor(actorRef);
            TestingUtils.stopActor(actorRef2);
            TestingUtils.stopActor(actorRef3);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorRef);
            TestingUtils.stopActor(actorRef2);
            TestingUtils.stopActor(actorRef3);
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testWatcherQuarantining() throws ExecutionException, InterruptedException {
        TestingQuarantineHandler testingQuarantineHandler = new TestingQuarantineHandler();
        ActorRef actorRef = null;
        ActorRef actorRef2 = null;
        ActorRef actorRef3 = null;
        FiniteDuration finiteDuration = new FiniteDuration(5L, TimeUnit.SECONDS);
        FiniteDuration finiteDuration2 = new FiniteDuration(200L, TimeUnit.MILLISECONDS);
        try {
            actorRef3 = actorSystem1.actorOf(getQuarantineMonitorProps(testingQuarantineHandler), "quarantineMonitor");
            actorRef = this.actorSystem2.actorOf(getWatcheeProps(finiteDuration, finiteDuration2, testingQuarantineHandler), "watchee");
            actorRef2 = actorSystem1.actorOf(getWatcherProps(finiteDuration, finiteDuration2, testingQuarantineHandler), "watcher");
            Address address = AkkaUtils.getAddress(this.actorSystem2);
            String akkaURL = AkkaUtils.getAkkaURL(this.actorSystem2, actorRef);
            actorRef.tell(new Ping(AkkaUtils.getAkkaURL(actorSystem1, actorRef2)), ActorRef.noSender());
            actorRef2.tell(new Watch(akkaURL), ActorRef.noSender());
            Assert.assertEquals(address.toString(), testingQuarantineHandler.getHasQuarantinedFuture().get());
            TestingUtils.stopActor(actorRef);
            TestingUtils.stopActor(actorRef2);
            TestingUtils.stopActor(actorRef3);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorRef);
            TestingUtils.stopActor(actorRef2);
            TestingUtils.stopActor(actorRef3);
            throw th;
        }
    }

    static Props getWatcheeProps(FiniteDuration finiteDuration, FiniteDuration finiteDuration2, ErrorHandler errorHandler) {
        return Props.create(Watchee.class, new Object[]{finiteDuration, finiteDuration2, errorHandler});
    }

    static Props getWatcherProps(FiniteDuration finiteDuration, FiniteDuration finiteDuration2, ErrorHandler errorHandler) {
        return Props.create(Watcher.class, new Object[]{finiteDuration, finiteDuration2, errorHandler});
    }

    static Props getQuarantineMonitorProps(QuarantineHandler quarantineHandler) {
        return Props.create(QuarantineMonitor.class, new Object[]{quarantineHandler, LOG});
    }
}
