package org.apache.beam.sdk.io.jms;

import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOTest.class */
public class JmsIOTest {
    private static final String BROKER_URL = "vm://localhost";
    private BrokerService broker;
    private ConnectionFactory connectionFactory;

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
        this.broker.addConnector(BROKER_URL);
        this.broker.setBrokerName("localhost");
        this.broker.start();
        this.connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testReadMessages() throws Exception {
        Connection createConnection = this.connectionFactory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("test"));
        TextMessage createTextMessage = createSession.createTextMessage("This Is A Test");
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.close();
        createSession.close();
        createConnection.close();
        PAssert.thatSingleton(this.pipeline.apply(JmsIO.read().withConnectionFactory(this.connectionFactory).withQueue("test").withMaxNumRecords(5L)).apply("Count", Count.globally())).isEqualTo(new Long(5L));
        this.pipeline.run();
        Session createSession2 = this.connectionFactory.createConnection().createSession(false, 1);
        Assert.assertNull(createSession2.createConsumer(createSession2.createQueue("test")).receiveNoWait());
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteMessage() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add("Message " + i);
        }
        this.pipeline.apply(Create.of(arrayList)).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withQueue("test"));
        this.pipeline.run();
        Connection createConnection = this.connectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        int i2 = 0;
        while (createSession.createConsumer(createSession.createQueue("test")).receive(1000L) != null) {
            i2++;
        }
        Assert.assertEquals(100L, i2);
    }
}
