package brooklyn.entity.messaging.kafka;

import brooklyn.entity.basic.ApplicationBuilder;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.entity.trait.Startable;
import brooklyn.location.Location;
import brooklyn.location.LocationSpec;
import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
import brooklyn.test.Asserts;
import brooklyn.test.EntityTestUtils;
import brooklyn.test.entity.TestApplication;
import brooklyn.util.collections.MutableMap;
import brooklyn.util.time.Duration;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.concurrent.Callable;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:brooklyn/entity/messaging/kafka/KafkaIntegrationTest.class */
public class KafkaIntegrationTest {
    private TestApplication app;
    private Location testLocation;

    @BeforeMethod(alwaysRun = true)
    public void setup() {
        this.app = ApplicationBuilder.newManagedApp(TestApplication.class);
        this.testLocation = this.app.getManagementContext().getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
    }

    @AfterMethod(alwaysRun = true)
    public void shutdown() {
        if (this.app != null) {
            Entities.destroyAll(this.app.getManagementContext());
        }
    }

    @Test(groups = {"Integration"})
    public void testZookeeper() {
        KafkaZooKeeper createAndManageChild = this.app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class));
        createAndManageChild.start(ImmutableList.of(this.testLocation));
        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60000), createAndManageChild, Startable.SERVICE_UP, true);
        createAndManageChild.stop();
        Assert.assertFalse(((Boolean) createAndManageChild.getAttribute(Startable.SERVICE_UP)).booleanValue());
    }

    @Test(groups = {"Integration"})
    public void testBrokerPlusZookeeper() {
        KafkaZooKeeper createAndManageChild = this.app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class));
        KafkaBroker createAndManageChild2 = this.app.createAndManageChild(EntitySpec.create(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, createAndManageChild));
        createAndManageChild.start(ImmutableList.of(this.testLocation));
        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60000), createAndManageChild, Startable.SERVICE_UP, true);
        createAndManageChild2.start(ImmutableList.of(this.testLocation));
        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60000), createAndManageChild2, Startable.SERVICE_UP, true);
        createAndManageChild.stop();
        Assert.assertFalse(((Boolean) createAndManageChild.getAttribute(Startable.SERVICE_UP)).booleanValue());
        createAndManageChild2.stop();
        Assert.assertFalse(((Boolean) createAndManageChild2.getAttribute(Startable.SERVICE_UP)).booleanValue());
    }

    @Test(groups = {"Integration"})
    public void testTwoBrokerCluster() throws InterruptedException {
        final KafkaCluster createAndManageChild = this.app.createAndManageChild(EntitySpec.create(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE, 2));
        createAndManageChild.start(ImmutableList.of(this.testLocation));
        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Callable<Void>() { // from class: brooklyn.entity.messaging.kafka.KafkaIntegrationTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() {
                Assert.assertTrue(((Boolean) createAndManageChild.getAttribute(Startable.SERVICE_UP)).booleanValue());
                Assert.assertTrue(((Boolean) createAndManageChild.getZooKeeper().getAttribute(Startable.SERVICE_UP)).booleanValue());
                Assert.assertEquals(createAndManageChild.getCurrentSize().intValue(), 2);
                return null;
            }
        });
        Entities.dumpInfo(createAndManageChild);
        final KafkaSupport kafkaSupport = new KafkaSupport(createAndManageChild);
        kafkaSupport.sendMessage("brooklyn", "TEST_MESSAGE");
        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_SECONDS), new Runnable() { // from class: brooklyn.entity.messaging.kafka.KafkaIntegrationTest.2
            @Override // java.lang.Runnable
            public void run() {
                Assert.assertEquals(kafkaSupport.getMessage("brooklyn"), "TEST_MESSAGE");
            }
        });
    }
}
