package brooklyn.management.internal;

import brooklyn.entity.Entity;
import brooklyn.entity.basic.Entities;
import brooklyn.event.Sensor;
import brooklyn.event.SensorEvent;
import brooklyn.management.ExecutionManager;
import brooklyn.management.SubscriptionHandle;
import brooklyn.util.JavaGroovyEquivalents;
import brooklyn.util.task.BasicExecutionManager;
import brooklyn.util.task.SingleThreadedScheduler;
import brooklyn.util.text.Identifiers;
import com.google.common.base.Predicate;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:brooklyn/management/internal/LocalSubscriptionManager.class */
public class LocalSubscriptionManager extends AbstractSubscriptionManager {
    private static final Logger LOG;
    protected final ExecutionManager em;
    private final String tostring = "SubscriptionContext(" + Identifiers.getBase64IdFromValue(System.identityHashCode(this), 5) + ")";
    private final AtomicLong totalEventsPublishedCount = new AtomicLong();
    private final AtomicLong totalEventsDeliveredCount = new AtomicLong();
    protected final ConcurrentMap<String, Subscription> allSubscriptions = new ConcurrentHashMap();
    protected final ConcurrentMap<Object, Set<Subscription>> subscriptionsBySubscriber = new ConcurrentHashMap();
    protected final ConcurrentMap<Object, Set<Subscription>> subscriptionsByToken = new ConcurrentHashMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    public LocalSubscriptionManager(ExecutionManager executionManager) {
        this.em = executionManager;
    }

    public long getNumSubscriptions() {
        return this.allSubscriptions.size();
    }

    public long getTotalEventsPublished() {
        return this.totalEventsPublishedCount.get();
    }

    public long getTotalEventsDelivered() {
        return this.totalEventsDeliveredCount.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // brooklyn.management.internal.AbstractSubscriptionManager
    public synchronized <T> SubscriptionHandle subscribe(Map<String, Object> map, Subscription<T> subscription) {
        Entity entity = subscription.producer;
        Sensor<T> sensor = subscription.sensor;
        subscription.subscriber = getSubscriber(map, subscription);
        if (map.containsKey("subscriberExecutionManagerTag")) {
            subscription.subscriberExecutionManagerTag = map.remove("subscriberExecutionManagerTag");
            subscription.subscriberExecutionManagerTagSupplied = true;
        } else {
            subscription.subscriberExecutionManagerTag = subscription.subscriber instanceof Entity ? "subscription-delivery-entity-" + ((Entity) subscription.subscriber).getId() + "[" + subscription.subscriber + "]" : subscription.subscriber instanceof String ? "subscription-delivery-string[" + subscription.subscriber + "]" : "subscription-delivery-object[" + subscription.subscriber + "]";
            subscription.subscriberExecutionManagerTagSupplied = false;
        }
        subscription.eventFilter = (Predicate) map.remove("eventFilter");
        subscription.flags = map;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[]{subscription.id, subscription.subscriber, entity, sensor, this});
        }
        this.allSubscriptions.put(subscription.id, subscription);
        addToMapOfSets(this.subscriptionsByToken, makeEntitySensorToken(subscription.producer, subscription.sensor), subscription);
        if (subscription.subscriber != null) {
            addToMapOfSets(this.subscriptionsBySubscriber, subscription.subscriber, subscription);
        }
        if (!subscription.subscriberExecutionManagerTagSupplied && subscription.subscriberExecutionManagerTag != null) {
            ((BasicExecutionManager) this.em).setTaskSchedulerForTag(subscription.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
        }
        return subscription;
    }

    public Set<SubscriptionHandle> getSubscriptionsForSubscriber(Object obj) {
        return (Set) JavaGroovyEquivalents.elvis(this.subscriptionsBySubscriber.get(obj), Collections.emptySet());
    }

    public synchronized Set<SubscriptionHandle> getSubscriptionsForEntitySensor(Entity entity, Sensor<?> sensor) {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        linkedHashSet.addAll(JavaGroovyEquivalents.elvis(this.subscriptionsByToken.get(makeEntitySensorToken(entity, sensor)), Collections.emptySet()));
        linkedHashSet.addAll(JavaGroovyEquivalents.elvis(this.subscriptionsByToken.get(makeEntitySensorToken(null, sensor)), Collections.emptySet()));
        linkedHashSet.addAll(JavaGroovyEquivalents.elvis(this.subscriptionsByToken.get(makeEntitySensorToken(entity, null)), Collections.emptySet()));
        linkedHashSet.addAll(JavaGroovyEquivalents.elvis(this.subscriptionsByToken.get(makeEntitySensorToken(null, null)), Collections.emptySet()));
        return linkedHashSet;
    }

    public synchronized boolean unsubscribe(SubscriptionHandle subscriptionHandle) {
        if (!(subscriptionHandle instanceof Subscription)) {
            throw new IllegalArgumentException("Only subscription handles of type Subscription supported: sh=" + subscriptionHandle + "; type=" + (subscriptionHandle != null ? subscriptionHandle.getClass().getCanonicalName() : null));
        }
        Subscription subscription = (Subscription) subscriptionHandle;
        boolean z = this.allSubscriptions.remove(subscription.id) != null;
        boolean removeFromMapOfCollections = removeFromMapOfCollections(this.subscriptionsByToken, makeEntitySensorToken(subscription.producer, subscription.sensor), subscription);
        if (!$assertionsDisabled && z != removeFromMapOfCollections) {
            throw new AssertionError();
        }
        if (subscription.subscriber != null) {
            boolean removeFromMapOfCollections2 = removeFromMapOfCollections(this.subscriptionsBySubscriber, subscription.subscriber, subscription);
            if (!$assertionsDisabled && removeFromMapOfCollections2 != removeFromMapOfCollections) {
                throw new AssertionError();
            }
        }
        ((BasicExecutionManager) this.em).setTaskSchedulerForTag(subscription.subscriberExecutionManagerTag, SingleThreadedScheduler.class);
        return z;
    }

    @Override // brooklyn.management.internal.AbstractSubscriptionManager
    public <T> void publish(final SensorEvent<T> sensorEvent) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("{} got event {}", this, sensorEvent);
        }
        this.totalEventsPublishedCount.incrementAndGet();
        Set<SubscriptionHandle> subscriptionsForEntitySensor = getSubscriptionsForEntitySensor(sensorEvent.getSource(), sensorEvent.getSensor());
        if (JavaGroovyEquivalents.groovyTruth(subscriptionsForEntitySensor)) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("sending {}, {} to {}", new Object[]{sensorEvent.getSensor().getName(), sensorEvent, JavaGroovyEquivalents.join(subscriptionsForEntitySensor, ",")});
            }
            Iterator<SubscriptionHandle> it = subscriptionsForEntitySensor.iterator();
            while (it.hasNext()) {
                final Subscription subscription = (Subscription) it.next();
                if (subscription.eventFilter == null || subscription.eventFilter.apply(sensorEvent)) {
                    this.em.submit(JavaGroovyEquivalents.mapOf("tag", subscription.subscriberExecutionManagerTag), new Runnable() { // from class: brooklyn.management.internal.LocalSubscriptionManager.1
                        public String toString() {
                            return "LSM.publish(" + sensorEvent + ")";
                        }

                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                subscription.listener.onEvent(sensorEvent);
                            } catch (Throwable th) {
                                if (sensorEvent == null || sensorEvent.getSource() == null || !Entities.isNoLongerManaged(sensorEvent.getSource())) {
                                    LocalSubscriptionManager.LOG.warn("Error processing subscriptions to " + this + ": " + th, th);
                                } else {
                                    LocalSubscriptionManager.LOG.debug("Error processing subscriptions to " + this + ", after entity unmanaged: " + th, th);
                                }
                            }
                        }
                    });
                    this.totalEventsDeliveredCount.incrementAndGet();
                }
            }
        }
    }

    public String toString() {
        return this.tostring;
    }

    @Deprecated
    private static <K, V> Set<V> addToMapOfSets(Map<K, Set<V>> map, K k, V v) {
        synchronized (map) {
            Set<V> set = map.get(k);
            if (set == null) {
                set = new LinkedHashSet();
                map.put(k, set);
            }
            if (set.isEmpty()) {
                synchronized (set) {
                    set.add(v);
                }
                return set;
            }
            synchronized (set) {
                if (set.isEmpty()) {
                    return addToMapOfSets(map, k, v);
                }
                set.add(v);
                return set;
            }
        }
    }

    @Deprecated
    private static <K, V> boolean removeFromMapOfCollections(Map<K, ? extends Collection<V>> map, K k, V v) {
        boolean remove;
        synchronized (map) {
            Collection<V> collection = map.get(k);
            if (collection == null) {
                return false;
            }
            synchronized (collection) {
                remove = collection.remove(v);
            }
            if (collection.isEmpty()) {
                synchronized (map) {
                    synchronized (collection) {
                        if (collection.isEmpty() && map.get(k) == collection) {
                            map.remove(k);
                        }
                    }
                }
            }
            return remove;
        }
    }

    static {
        $assertionsDisabled = !LocalSubscriptionManager.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(LocalSubscriptionManager.class);
    }
}
