package org.apache.brooklyn.core.mgmt.internal;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.entity.group.BasicGroup;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.class */
public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport {
    private static final int TIMEOUT_MS = 5000;
    private TestEntity entity;

    @Override // org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport, org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
        super.setUp();
        this.entity = (TestEntity) this.app.createAndManageChild(EntitySpec.create(TestEntity.class));
    }

    @Test
    public void testSubscribeToEntityAttributeChange() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.app.subscriptions().subscribe(this.entity, TestEntity.SEQUENCE, new SensorEventListener<Object>() { // from class: org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManagerTest.1
            public void onEvent(SensorEvent<Object> sensorEvent) {
                countDownLatch.countDown();
            }
        });
        this.entity.setSequenceValue(1234);
        if (countDownLatch.await(5000L, TimeUnit.MILLISECONDS)) {
            return;
        }
        Assert.fail("Timeout waiting for Event on TestEntity listener");
    }

    @Test
    public void testSubscribeToEntityWithAttributeWildcard() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.app.subscriptions().subscribe(this.entity, (Sensor) null, new SensorEventListener<Object>() { // from class: org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManagerTest.2
            public void onEvent(SensorEvent<Object> sensorEvent) {
                countDownLatch.countDown();
            }
        });
        this.entity.setSequenceValue(1234);
        if (countDownLatch.await(5000L, TimeUnit.MILLISECONDS)) {
            return;
        }
        Assert.fail("Timeout waiting for Event on TestEntity listener");
    }

    @Test
    public void testSubscribeToAttributeChangeWithEntityWildcard() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.app.subscriptions().subscribe((Entity) null, TestEntity.SEQUENCE, new SensorEventListener<Object>() { // from class: org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManagerTest.3
            public void onEvent(SensorEvent<Object> sensorEvent) {
                countDownLatch.countDown();
            }
        });
        this.entity.setSequenceValue(1234);
        if (countDownLatch.await(5000L, TimeUnit.MILLISECONDS)) {
            return;
        }
        Assert.fail("Timeout waiting for Event on TestEntity listener");
    }

    @Test
    public void testSubscribeToChildAttributeChange() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.app.subscriptions().subscribeToChildren(this.app, TestEntity.SEQUENCE, new SensorEventListener<Object>() { // from class: org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManagerTest.4
            public void onEvent(SensorEvent<Object> sensorEvent) {
                countDownLatch.countDown();
            }
        });
        this.entity.setSequenceValue(1234);
        if (countDownLatch.await(5000L, TimeUnit.MILLISECONDS)) {
            return;
        }
        Assert.fail("Timeout waiting for Event on child TestEntity listener");
    }

    @Test
    public void testSubscribeToMemberAttributeChange() throws Exception {
        BasicGroup createAndManageChild = this.app.createAndManageChild(EntitySpec.create(BasicGroup.class));
        TestEntity testEntity = (TestEntity) this.app.createAndManageChild(EntitySpec.create(TestEntity.class));
        createAndManageChild.addMember(testEntity);
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.app.subscriptions().subscribeToMembers(createAndManageChild, TestEntity.SEQUENCE, new SensorEventListener<Integer>() { // from class: org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManagerTest.5
            public void onEvent(SensorEvent<Integer> sensorEvent) {
                copyOnWriteArrayList.add(sensorEvent);
                countDownLatch.countDown();
            }
        });
        testEntity.sensors().set(TestEntity.SEQUENCE, 123);
        if (!countDownLatch.await(5000L, TimeUnit.MILLISECONDS)) {
            Assert.fail("Timeout waiting for Event on parent TestEntity listener");
        }
        Assert.assertEquals(copyOnWriteArrayList.size(), 1);
        Assert.assertEquals(((SensorEvent) copyOnWriteArrayList.get(0)).getValue(), 123);
        Assert.assertEquals(((SensorEvent) copyOnWriteArrayList.get(0)).getSensor(), TestEntity.SEQUENCE);
        Assert.assertEquals(((SensorEvent) copyOnWriteArrayList.get(0)).getSource().getId(), testEntity.getId());
    }

    @Test(groups = {"Integration"})
    public void testConcurrentSubscribingAndPublishing() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        TestEntity testEntity = (TestEntity) this.app.createAndManageChild(EntitySpec.create(TestEntity.class));
        Thread thread = new Thread() { // from class: org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManagerTest.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    SensorEventListener<Object> sensorEventListener = new SensorEventListener<Object>() { // from class: org.apache.brooklyn.core.mgmt.internal.LocalSubscriptionManagerTest.6.1
                        public void onEvent(SensorEvent<Object> sensorEvent) {
                        }
                    };
                    LocalSubscriptionManagerTest.this.app.subscriptions().subscribe((Entity) null, TestEntity.SEQUENCE, sensorEventListener);
                    while (!Thread.currentThread().isInterrupted()) {
                        LocalSubscriptionManagerTest.this.app.subscriptions().unsubscribe((Entity) null, LocalSubscriptionManagerTest.this.app.subscriptions().subscribe((Entity) null, TestEntity.SEQUENCE, sensorEventListener));
                    }
                } catch (Exception e) {
                    atomicReference.set(e);
                }
            }
        };
        try {
            thread.start();
            for (int i = 0; i < 10000; i++) {
                testEntity.sensors().set(TestEntity.SEQUENCE, Integer.valueOf(i));
            }
            if (atomicReference.get() != null) {
                throw ((Exception) atomicReference.get());
            }
        } finally {
            thread.interrupt();
        }
    }
}
