package org.apache.brooklyn.entity.messaging.storm;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.Map;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
import org.apache.brooklyn.entity.messaging.storm.Storm;
import org.apache.brooklyn.entity.messaging.storm.topologies.ExclamationBolt;
import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
import org.apache.brooklyn.test.EntityTestUtils;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.ResourceUtils;
import org.apache.brooklyn.util.core.file.ArchiveBuilder;
import org.apache.brooklyn.util.os.Os;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.class */
public abstract class StormAbstractCloudLiveTest extends BrooklynAppLiveTestSupport {
    protected static final Logger log = LoggerFactory.getLogger(StormAbstractCloudLiveTest.class);
    private Location location;
    private ZooKeeperEnsemble zooKeeperEnsemble;
    private Storm nimbus;
    private Storm supervisor;
    private Storm ui;

    @BeforeClass(alwaysRun = true)
    public void beforeClass() throws Exception {
        this.mgmt = new LocalManagementContext();
        this.location = this.mgmt.getLocationRegistry().getLocationManaged(getLocation(), getFlags());
        super.setUp();
    }

    @AfterClass(alwaysRun = true)
    public void afterClass() throws Exception {
    }

    public abstract String getLocation();

    public Map<String, ?> getFlags() {
        return MutableMap.of();
    }

    @Test(groups = {"Live", "WIP"})
    public void deployStorm() throws Exception {
        try {
            this.zooKeeperEnsemble = this.app.createAndManageChild(EntitySpec.create(ZooKeeperEnsemble.class).configure(ZooKeeperEnsemble.INITIAL_SIZE, 3));
            this.nimbus = this.app.createAndManageChild(EntitySpec.create(Storm.class).configure(Storm.ROLE, Storm.Role.NIMBUS).configure(Storm.NIMBUS_HOSTNAME, "localhost").configure(Storm.ZOOKEEPER_ENSEMBLE, this.zooKeeperEnsemble));
            this.supervisor = this.app.createAndManageChild(EntitySpec.create(Storm.class).configure(Storm.ROLE, Storm.Role.SUPERVISOR).configure(Storm.ZOOKEEPER_ENSEMBLE, this.zooKeeperEnsemble).configure(Storm.NIMBUS_HOSTNAME, DependentConfiguration.attributeWhenReady(this.nimbus, Attributes.HOSTNAME)));
            this.ui = this.app.createAndManageChild(EntitySpec.create(Storm.class).configure(Storm.ROLE, Storm.Role.UI).configure(Storm.ZOOKEEPER_ENSEMBLE, this.zooKeeperEnsemble).configure(Storm.NIMBUS_HOSTNAME, DependentConfiguration.attributeWhenReady(this.nimbus, Attributes.HOSTNAME)));
            log.info("Started Storm deployment on '" + getLocation() + "'");
            this.app.start(ImmutableList.of(this.location));
            Entities.dumpInfo(this.app);
            EntityTestUtils.assertAttributeEqualsEventually(this.app, Startable.SERVICE_UP, true);
            EntityTestUtils.assertAttributeEqualsEventually(this.zooKeeperEnsemble, Startable.SERVICE_UP, true);
            EntityTestUtils.assertAttributeEqualsEventually(this.nimbus, Startable.SERVICE_UP, true);
            EntityTestUtils.assertAttributeEqualsEventually(this.supervisor, Startable.SERVICE_UP, true);
            EntityTestUtils.assertAttributeEqualsEventually(this.ui, Startable.SERVICE_UP, true);
            submitTopology(createTopology(), "myExclamation", 3, true, 60000L);
        } catch (Exception e) {
            log.error("Failed to deploy Storm", e);
            Assert.fail();
            throw e;
        }
    }

    private StormTopology createTopology() throws AlreadyAliveException, InvalidTopologyException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("word", new TestWordSpout(), 10);
        topologyBuilder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
        topologyBuilder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
        return topologyBuilder.createTopology();
    }

    public boolean submitTopology(StormTopology stormTopology, String str, int i, boolean z, long j) {
        if (log.isDebugEnabled()) {
            log.debug("Connecting to NimbusClient: {}", this.nimbus.getConfig(Storm.NIMBUS_HOSTNAME));
        }
        Config config = new Config();
        config.setDebug(z);
        config.setNumWorkers(i);
        System.setProperty("storm.jar", createJar(new File(Os.mergePaths(new String[]{ResourceUtils.create(this).getClassLoaderDir(), "org/apache/brooklyn/entity/messaging/storm/topologies"})), "org/apache/brooklyn/entity/messaging/storm/"));
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = j == -1 ? Long.MAX_VALUE : currentTimeMillis + j;
        long j3 = currentTimeMillis;
        Exception exc = null;
        int i2 = 0;
        while (j3 <= j2) {
            j3 = System.currentTimeMillis();
            if (i2 != 0) {
                Time.sleep(Duration.ONE_SECOND);
            }
            if (log.isTraceEnabled()) {
                log.trace("trying connection to {} at time {}", this.nimbus.getConfig(Storm.NIMBUS_HOSTNAME), Long.valueOf(j3));
            }
            try {
                StormSubmitter.submitTopology(str, config, stormTopology);
                return true;
            } catch (Exception e) {
                if (!shouldRetryOn(e)) {
                    throw Throwables.propagate(e);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Attempt {} failed connecting to {} ({})", new Object[]{Integer.valueOf(i2 + 1), this.nimbus.getConfig(Storm.NIMBUS_HOSTNAME), e.getMessage()});
                }
                exc = e;
                i2++;
            }
        }
        log.warn("unable to connect to Nimbus client: ", exc);
        Assert.fail();
        return false;
    }

    private boolean shouldRetryOn(Exception exc) {
        return exc.getMessage().equals("org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused");
    }

    private String createJar(File file, String str) {
        return file.isDirectory() ? ArchiveBuilder.jar().addAt(file, str).create(Os.newTempDir(getClass()) + "/topologies.jar").getAbsolutePath() : file.getAbsolutePath();
    }
}
