package org.apache.brooklyn.entity.messaging.rabbit;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.test.entity.TestApplication;
import org.apache.brooklyn.entity.messaging.MessageBroker;
import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.class */
public class RabbitIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(RabbitIntegrationTest.class);
    private TestApplication app;
    private Location testLocation;
    private RabbitBroker rabbit;

    @BeforeMethod(groups = {"Integration"})
    public void setup() {
        this.app = ApplicationBuilder.newManagedApp(TestApplication.class);
        this.testLocation = new LocalhostMachineProvisioningLocation();
    }

    @AfterMethod(alwaysRun = true)
    public void shutdown() {
        if (this.app != null) {
            Entities.destroyAll(this.app.getManagementContext());
        }
    }

    @Test(groups = {"Integration", "WIP"})
    public void canStartupAndShutdown() throws Exception {
        this.rabbit = this.app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
        this.rabbit.start(ImmutableList.of(this.testLocation));
        EntityAsserts.assertAttributeEqualsEventually(this.rabbit, Startable.SERVICE_UP, true);
        this.rabbit.stop();
        Assert.assertFalse(((Boolean) this.rabbit.getAttribute(Startable.SERVICE_UP)).booleanValue());
    }

    @Test(groups = {"Integration", "WIP"})
    public void testClientConnection() throws Exception {
        this.rabbit = this.app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
        this.rabbit.start(ImmutableList.of(this.testLocation));
        EntityAsserts.assertAttributeEqualsEventually(this.rabbit, Startable.SERVICE_UP, true);
        byte[] bytes = "MessageBody".getBytes(Charsets.UTF_8);
        Channel channel = null;
        Channel channel2 = null;
        try {
            channel = getAmqpChannel(this.rabbit);
            channel2 = getAmqpChannel(this.rabbit);
            channel.queueDeclare("queueName", true, false, false, ImmutableMap.of());
            channel.queueBind("queueName", "amq.direct", "queueName");
            channel.basicPublish("amq.direct", "queueName", (AMQP.BasicProperties) null, bytes);
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel2);
            channel2.basicConsume("queueName", true, queueingConsumer);
            Assert.assertEquals(queueingConsumer.nextDelivery(60000L).getBody(), bytes);
            closeSafely(channel, 10000);
            closeSafely(channel2, 10000);
        } catch (Throwable th) {
            closeSafely(channel, 10000);
            closeSafely(channel2, 10000);
            throw th;
        }
    }

    private void closeSafely(final Channel channel, int i) throws InterruptedException {
        if (channel == null) {
            return;
        }
        Thread thread = new Thread(new Runnable() { // from class: org.apache.brooklyn.entity.messaging.rabbit.RabbitIntegrationTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    channel.close();
                } catch (IOException e) {
                    RabbitIntegrationTest.log.error("Error closing RabbitMQ Channel; continuing", e);
                }
            }
        });
        try {
            thread.start();
            thread.join(i);
            if (thread.isAlive()) {
                log.error("Timeout when closing RabbitMQ Channel " + channel + "; aborting close and continuing");
            }
        } finally {
            thread.interrupt();
            thread.join(1000L);
            if (thread.isAlive()) {
                thread.stop();
            }
        }
    }

    private Channel getAmqpChannel(RabbitBroker rabbitBroker) throws Exception {
        String str = (String) rabbitBroker.getAttribute(MessageBroker.BROKER_URL);
        log.warn("connecting to rabbit {}", str);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(str);
        return connectionFactory.newConnection().createChannel();
    }
}
