package org.apache.brooklyn.entity.messaging.activemq;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
import org.apache.brooklyn.entity.java.UsesJmx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.class */
public class ActiveMQIntegrationTest extends BrooklynAppLiveTestSupport {
    private static final Logger log = LoggerFactory.getLogger(ActiveMQIntegrationTest.class);
    private Location testLocation;
    private ActiveMQBroker activeMQ;

    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
        super.setUp();
        this.testLocation = this.app.newLocalhostProvisioningLocation();
    }

    @Test(groups = {"Integration"})
    public void canStartupAndShutdown() throws Exception {
        this.activeMQ = this.app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
        this.activeMQ.start(ImmutableList.of(this.testLocation));
        EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 600000), this.activeMQ, Startable.SERVICE_UP, true);
        log.info("JMX URL is " + ((String) this.activeMQ.getAttribute(UsesJmx.JMX_URL)));
        this.activeMQ.stop();
        Assert.assertFalse(((Boolean) this.activeMQ.getAttribute(Startable.SERVICE_UP)).booleanValue());
    }

    @Test(groups = {"Integration"})
    public void canStartupAndShutdownWithCustomJmx() throws Exception {
        this.activeMQ = this.app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("jmxPort", "11099+"));
        this.activeMQ.start(ImmutableList.of(this.testLocation));
        EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 600000), this.activeMQ, Startable.SERVICE_UP, true);
        log.info("JMX URL is " + ((String) this.activeMQ.getAttribute(UsesJmx.JMX_URL)));
        this.activeMQ.stop();
        Assert.assertFalse(((Boolean) this.activeMQ.getAttribute(Startable.SERVICE_UP)).booleanValue());
    }

    @Test(groups = {"Integration"})
    public void canStartupAndShutdownWithCustomBrokerName() throws Exception {
        this.activeMQ = this.app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("jmxPort", "11099+").configure("brokerName", "bridge"));
        this.activeMQ.start(ImmutableList.of(this.testLocation));
        EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 600000), this.activeMQ, Startable.SERVICE_UP, true);
        log.info("JMX URL is " + ((String) this.activeMQ.getAttribute(UsesJmx.JMX_URL)));
        this.activeMQ.stop();
        Assert.assertFalse(((Boolean) this.activeMQ.getAttribute(Startable.SERVICE_UP)).booleanValue());
    }

    @Test(groups = {"Integration"})
    public void canStartTwo() throws Exception {
        ActiveMQBroker createAndManageChild = this.app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
        ActiveMQBroker createAndManageChild2 = this.app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
        createAndManageChild.start(ImmutableList.of(this.testLocation));
        EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 600000), createAndManageChild, Startable.SERVICE_UP, true);
        log.info("JMX URL is " + ((String) createAndManageChild.getAttribute(UsesJmx.JMX_URL)));
        createAndManageChild2.start(ImmutableList.of(this.testLocation));
        EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 600000), createAndManageChild2, Startable.SERVICE_UP, true);
        log.info("JMX URL is " + ((String) createAndManageChild2.getAttribute(UsesJmx.JMX_URL)));
    }

    @Test(groups = {"Integration"})
    public void testCreatingQueuesDefault() throws Exception {
        String testCreatingQueuesInternal = testCreatingQueuesInternal(null);
        Assert.assertTrue(testCreatingQueuesInternal.contains("jmxmp"), "url=" + testCreatingQueuesInternal);
    }

    @Test(groups = {"Integration"})
    public void testCreatingQueuesRmi() throws Exception {
        String testCreatingQueuesInternal = testCreatingQueuesInternal(UsesJmx.JmxAgentModes.JMX_RMI_CUSTOM_AGENT);
        Assert.assertTrue(testCreatingQueuesInternal.contains("rmi://"), "url=" + testCreatingQueuesInternal);
        Assert.assertFalse(testCreatingQueuesInternal.contains("rmi:///jndi"), "url=" + testCreatingQueuesInternal);
        Assert.assertFalse(testCreatingQueuesInternal.contains("jmxmp"), "url=" + testCreatingQueuesInternal);
    }

    @Test(groups = {"Integration"})
    public void testCreatingQueuesJmxmp() throws Exception {
        String testCreatingQueuesInternal = testCreatingQueuesInternal(UsesJmx.JmxAgentModes.JMXMP);
        Assert.assertTrue(testCreatingQueuesInternal.contains("jmxmp"), "url=" + testCreatingQueuesInternal);
        Assert.assertFalse(testCreatingQueuesInternal.contains("rmi"), "url=" + testCreatingQueuesInternal);
    }

    @Test(groups = {"Integration"})
    public void testCreatingQueuesNoAgent() throws Exception {
        String testCreatingQueuesInternal = testCreatingQueuesInternal(UsesJmx.JmxAgentModes.NONE);
        Assert.assertTrue(testCreatingQueuesInternal.contains("service:jmx:rmi"), "url=" + testCreatingQueuesInternal);
        Assert.assertFalse(testCreatingQueuesInternal.contains("jmxmp"), "url=" + testCreatingQueuesInternal);
    }

    public String testCreatingQueuesInternal(UsesJmx.JmxAgentModes jmxAgentModes) throws Exception {
        this.activeMQ = this.app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", "testQueue").configure(UsesJmx.JMX_AGENT_MODE, jmxAgentModes));
        this.activeMQ.start(ImmutableList.of(this.testLocation));
        EntityAsserts.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 600000), this.activeMQ, Startable.SERVICE_UP, true);
        String str = (String) this.activeMQ.getAttribute(UsesJmx.JMX_URL);
        log.info("JMX URL (" + jmxAgentModes + ") is " + str);
        try {
            Assert.assertFalse(this.activeMQ.getQueueNames().isEmpty());
            Assert.assertEquals(this.activeMQ.getQueueNames().size(), 1);
            Assert.assertTrue(this.activeMQ.getQueueNames().contains("testQueue"));
            Assert.assertEquals(this.activeMQ.getChildren().size(), 1);
            Assert.assertFalse(this.activeMQ.getQueues().isEmpty());
            Assert.assertEquals(this.activeMQ.getQueues().size(), 1);
            ActiveMQQueue activeMQQueue = (ActiveMQQueue) this.activeMQ.getQueues().get("testQueue");
            Assert.assertNotNull(activeMQQueue);
            Assert.assertEquals(activeMQQueue.getName(), "testQueue");
            Connection activeMQConnection = getActiveMQConnection(this.activeMQ);
            clearQueue(activeMQConnection, "testQueue");
            EntityAsserts.assertAttributeEqualsEventually(activeMQQueue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
            sendMessages(activeMQConnection, 20, "testQueue", "01234567890123456789012345678901");
            EntityAsserts.assertAttributeEqualsEventually(activeMQQueue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 20);
            Assert.assertEquals(clearQueue(activeMQConnection, "testQueue"), 20);
            EntityAsserts.assertAttributeEqualsEventually(activeMQQueue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
            activeMQConnection.close();
            this.activeMQ.stop();
            return str;
        } catch (Throwable th) {
            this.activeMQ.stop();
            throw th;
        }
    }

    private Connection getActiveMQConnection(ActiveMQBroker activeMQBroker) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("tcp://" + ((String) activeMQBroker.getAttribute(ActiveMQBroker.ADDRESS)) + ":" + ((Integer) activeMQBroker.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT)).intValue()).createConnection("admin", "activemq");
        createConnection.start();
        return createConnection;
    }

    private void sendMessages(Connection connection, int i, String str, String str2) throws Exception {
        Session createSession = connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
        for (int i2 = 0; i2 < i; i2++) {
            createProducer.send(createSession.createTextMessage(str2));
        }
        createSession.close();
    }

    private int clearQueue(Connection connection, String str) throws Exception {
        Session createSession = connection.createSession(false, 1);
        int i = 0;
        while (createSession.createConsumer(createSession.createQueue(str)).receive(500L) != null) {
            i++;
        }
        createSession.close();
        return i;
    }
}
