package org.apache.flink.runtime.io.network.netty;

import java.net.InetAddress;
import javax.net.ssl.SSLSessionContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.util.NetUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.class */
public class NettyClientServerSslTest {
    @Test
    public void testValidSslConnection() throws Exception {
        testValidSslConnection(createSslConfig());
    }

    @Test
    public void testValidSslConnectionAdvanced() throws Exception {
        Configuration createSslConfig = createSslConfig();
        createSslConfig.setInteger(SecurityOptions.SSL_SESSION_CACHE_SIZE, 1);
        createSslConfig.setInteger(SecurityOptions.SSL_SESSION_TIMEOUT, 1000);
        createSslConfig.setInteger(SecurityOptions.SSL_HANDSHAKE_TIMEOUT, 1000);
        createSslConfig.setInteger(SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, 1000);
        testValidSslConnection(createSslConfig);
    }

    private void testValidSslConnection(Configuration configuration) throws Exception {
        NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(getEmptyNettyProtocol(), new NettyConfig(InetAddress.getLoopbackAddress(), NetUtils.getAvailablePort(), NettyTestUtil.DEFAULT_SEGMENT_SIZE, 1, configuration));
        Channel connect = NettyTestUtil.connect(initServerAndClient);
        SslHandler sslHandler = connect.pipeline().get("ssl");
        assertEqualsOrDefault(configuration, SecurityOptions.SSL_HANDSHAKE_TIMEOUT, sslHandler.getHandshakeTimeoutMillis());
        assertEqualsOrDefault(configuration, SecurityOptions.SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandler.getCloseNotifyTimeoutMillis());
        connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertTrue(connect.writeAndFlush("test").await().isSuccess());
        SSLSessionContext sessionContext = sslHandler.engine().getSession().getSessionContext();
        Assert.assertNotNull("bug in unit test setup: session context not available", sessionContext);
        assertEqualsOrDefault(configuration, SecurityOptions.SSL_SESSION_CACHE_SIZE, sessionContext.getSessionCacheSize());
        if (configuration.getInteger(SecurityOptions.SSL_SESSION_TIMEOUT) != -1) {
            Assert.assertEquals(r0 / 1000, sessionContext.getSessionTimeout());
        } else {
            Assert.assertTrue("default value (-1) should not be propagated", sessionContext.getSessionTimeout() >= 0);
        }
        NettyTestUtil.shutdown(initServerAndClient);
    }

    private static void assertEqualsOrDefault(Configuration configuration, ConfigOption<Integer> configOption, long j) {
        long integer = configuration.getInteger(configOption);
        if (integer != ((Integer) configOption.defaultValue()).intValue()) {
            Assert.assertEquals(integer, j);
        } else {
            Assert.assertTrue("default value (" + configOption.defaultValue() + ") should not be propagated", j >= 0);
        }
    }

    @Test
    public void testInvalidSslConfiguration() {
        NettyProtocol emptyNettyProtocol = getEmptyNettyProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "invalidpassword");
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = null;
        try {
            nettyServerAndClient = NettyTestUtil.initServerAndClient(emptyNettyProtocol, new NettyConfig(InetAddress.getLoopbackAddress(), NetUtils.getAvailablePort(), NettyTestUtil.DEFAULT_SEGMENT_SIZE, 1, createSslConfig));
            Assert.fail("Created server and client from invalid configuration");
        } catch (Exception e) {
        }
        NettyTestUtil.shutdown(nettyServerAndClient);
    }

    @Test
    public void testSslHandshakeError() throws Exception {
        NettyProtocol emptyNettyProtocol = getEmptyNettyProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/untrusted.keystore");
        NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(emptyNettyProtocol, new NettyConfig(InetAddress.getLoopbackAddress(), NetUtils.getAvailablePort(), NettyTestUtil.DEFAULT_SEGMENT_SIZE, 1, createSslConfig));
        Channel connect = NettyTestUtil.connect(initServerAndClient);
        connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertFalse(connect.writeAndFlush("test").await().isSuccess());
        NettyTestUtil.shutdown(initServerAndClient);
    }

    private Configuration createSslConfig() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(SecurityOptions.SSL_ENABLED, true);
        configuration.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore");
        configuration.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
        configuration.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
        configuration.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore");
        configuration.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
        return configuration;
    }

    private static NettyProtocol getEmptyNettyProtocol() {
        return new NettyProtocol(null, null, true) { // from class: org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest.1
            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[0];
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[0];
            }
        };
    }
}
