package org.apache.brooklyn.core.feed;

import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.predicates.DslPredicates;
import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.ScheduledTask;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/brooklyn/core/feed/Poller.class */
public class Poller<V> {
    public static final Logger log = LoggerFactory.getLogger(Poller.class);
    private final Entity entity;
    private final AbstractEntityAdjunct adjunct;
    private final boolean onlyIfServiceUp;
    private final Set<Callable<?>> oneOffJobs = new LinkedHashSet();
    private final Set<PollJob<V>> pollJobs = new LinkedHashSet();
    private final Set<Task<?>> oneOffTasks = new LinkedHashSet();
    private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet();
    private volatile boolean started = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/brooklyn/core/feed/Poller$PollJob.class */
    public static class PollJob<V> {
        final PollHandler<? super V> handler;
        final Duration pollPeriod;
        final Callable<?> job;
        final Runnable wrappedJob;
        final Entity pollTriggerEntity;
        final Sensor<?> pollTriggerSensor;
        final Supplier<DslPredicates.DslPredicate> pollCondition;
        SubscriptionHandle subscription;
        private boolean loggedPreviousException;

        PollJob(Callable<V> callable, PollHandler<? super V> pollHandler, Duration duration) {
            this(callable, pollHandler, duration, null, null, null);
        }

        /* JADX WARN: Multi-variable type inference failed */
        PollJob(final Callable<V> callable, final PollHandler<? super V> pollHandler, Duration duration, Entity entity, Sensor<?> sensor, final Supplier<DslPredicates.DslPredicate> supplier) {
            this.loggedPreviousException = false;
            this.handler = pollHandler;
            this.pollPeriod = duration;
            this.pollTriggerEntity = entity;
            this.pollTriggerSensor = sensor;
            this.pollCondition = supplier;
            this.job = callable;
            this.wrappedJob = new Runnable() { // from class: org.apache.brooklyn.core.feed.Poller.PollJob.1
                /* JADX WARN: Multi-variable type inference failed */
                @Override // java.lang.Runnable
                public void run() {
                    DslPredicates.DslPredicate dslPredicate;
                    try {
                        if (supplier != null && (dslPredicate = (DslPredicates.DslPredicate) supplier.get()) != null && !dslPredicate.apply(BrooklynTaskTags.getContextEntity(Tasks.current()))) {
                            if (Poller.log.isTraceEnabled()) {
                                Poller.log.trace("Skipping execution for PollJob {} because condition does not apply", callable);
                            }
                            Poller.log.debug("Skipping poll/feed execution because condition does not apply");
                        } else {
                            Object call = callable.call();
                            if (pollHandler.checkSuccess(call)) {
                                pollHandler.onSuccess(call);
                            } else {
                                pollHandler.onFailure(call);
                            }
                            PollJob.this.loggedPreviousException = false;
                        }
                    } catch (Exception e) {
                        if (!PollJob.this.loggedPreviousException) {
                            if (Poller.log.isDebugEnabled()) {
                                Poller.log.debug("PollJob for {}, repeated consecutive failures, handling {} using {}", new Object[]{callable, e, pollHandler});
                            }
                            PollJob.this.loggedPreviousException = true;
                        } else if (Poller.log.isTraceEnabled()) {
                            Poller.log.trace("PollJob for {}, repeated consecutive failures, handling {} using {}", new Object[]{callable, e, pollHandler});
                        }
                        pollHandler.onException(e);
                    }
                }
            };
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <PI, PC extends PollConfig> void scheduleFeed(AbstractFeed abstractFeed, SetMultimap<PI, PC> setMultimap, Function<PI, Callable<?>> function) {
        for (Object obj : setMultimap.keySet()) {
            Set<? extends PollConfig> set = setMultimap.get(obj);
            LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
            Iterator<? extends PollConfig> it = set.iterator();
            while (it.hasNext()) {
                newLinkedHashSet.add(new AttributePollHandler(it.next(), this.entity, abstractFeed));
            }
            schedulePoll(abstractFeed, set, (Callable) function.apply(obj), new DelegatingPollHandler(newLinkedHashSet));
        }
    }

    public void schedulePoll(AbstractEntityAdjunct abstractEntityAdjunct, Set<? extends PollConfig> set, Callable callable, PollHandler pollHandler) {
        boolean z = false;
        long j = Long.MAX_VALUE;
        MutableSet of = MutableSet.of();
        for (PollConfig pollConfig : set) {
            of.add(pollConfig.getCondition());
            if (pollConfig.getPeriod() > 0) {
                j = Math.min(j, pollConfig.getPeriod());
            }
            MutableSet<Pair> of2 = MutableSet.of();
            if (pollConfig.getOtherTriggers() != null) {
                of2.addAll(AbstractAddTriggerableSensor.resolveTriggers(abstractEntityAdjunct.mo125getEntity(), pollConfig.getOtherTriggers()));
            }
            if (this.onlyIfServiceUp) {
                of2.add(Pair.of(abstractEntityAdjunct.mo125getEntity(), Attributes.SERVICE_UP));
            }
            for (Pair pair : of2) {
                subscribe(callable, pollHandler, (Entity) pair.getLeft(), (Sensor) pair.getRight(), pollConfig.getCondition());
                z = true;
            }
        }
        if (j > 0) {
            if (j < Duration.PRACTICALLY_FOREVER.toMilliseconds() || !z) {
                scheduleAtFixedRate(callable, pollHandler, Duration.millis(Long.valueOf(j)), of.isEmpty() ? null : of.size() == 1 ? (Supplier) Iterables.getOnlyElement(of) : of.contains(null) ? null : () -> {
                    DslPredicates.DslPredicateDefault dslPredicateDefault = new DslPredicates.DslPredicateDefault();
                    dslPredicateDefault.any = (List) of.stream().collect(Collectors.toList());
                    return dslPredicateDefault;
                });
            }
        }
    }

    public Poller(Entity entity, AbstractEntityAdjunct abstractEntityAdjunct, boolean z) {
        this.entity = entity;
        this.adjunct = abstractEntityAdjunct;
        this.onlyIfServiceUp = z;
    }

    public void submit(Callable<?> callable) {
        if (this.started) {
            throw new IllegalStateException("Cannot submit additional tasks after poller has started");
        }
        this.oneOffJobs.add(callable);
    }

    public void scheduleAtFixedRate(Callable<V> callable, PollHandler<? super V> pollHandler, long j) {
        scheduleAtFixedRate(callable, pollHandler, Duration.millis(Long.valueOf(j)), null);
    }

    public void scheduleAtFixedRate(Callable<V> callable, PollHandler<? super V> pollHandler, Duration duration) {
        scheduleAtFixedRate(callable, pollHandler, duration, null);
    }

    public void scheduleAtFixedRate(Callable<V> callable, PollHandler<? super V> pollHandler, Duration duration, Supplier<DslPredicates.DslPredicate> supplier) {
        if (this.started) {
            throw new IllegalStateException("Cannot schedule additional tasks after poller has started");
        }
        this.pollJobs.add(new PollJob<>(callable, pollHandler, duration, null, null, supplier));
    }

    public void subscribe(Callable<V> callable, PollHandler<? super V> pollHandler, Entity entity, Sensor<?> sensor, Supplier<DslPredicates.DslPredicate> supplier) {
        this.pollJobs.add(new PollJob<>(callable, pollHandler, null, entity, sensor, supplier));
    }

    public void start() {
        if (log.isDebugEnabled()) {
            log.debug("Starting poll for {} (using {})", new Object[]{this.entity, this});
        }
        if (this.started) {
            throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", this, this.entity));
        }
        this.started = true;
        for (Callable<?> callable : this.oneOffJobs) {
            this.oneOffTasks.add(this.adjunct.getExecutionContext().submit(Tasks.builder().dynamic(false).body(callable).displayName("Poll").description("One-time poll job " + callable).build()));
        }
        Duration duration = null;
        MutableSet of = MutableSet.of();
        Function function = pollJob -> {
            return (String) MutableList.of(this.adjunct != null ? this.adjunct.getDisplayName() : null, pollJob.handler.getDescription()).stream().filter((v0) -> {
                return Strings.isNonBlank(v0);
            }).collect(Collectors.joining("; "));
        };
        BiFunction biFunction = (runnable, str) -> {
            DynamicSequentialTask dynamicSequentialTask = new DynamicSequentialTask(MutableMap.of("displayName", str, "entity", this.entity), () -> {
                if (!Entities.isManagedActive(this.entity)) {
                    return null;
                }
                if (this.onlyIfServiceUp && !Boolean.TRUE.equals(this.entity.getAttribute(Attributes.SERVICE_UP))) {
                    return null;
                }
                runnable.run();
                return null;
            });
            BrooklynTaskTags.addTagDynamically(dynamicSequentialTask, "NON-TRANSIENT");
            return dynamicSequentialTask;
        };
        SetMultimap newSetMultimap = Multimaps.newSetMultimap(MutableMap.of(), MutableSet::of);
        this.pollJobs.forEach(pollJob2 -> {
            newSetMultimap.put(pollJob2.job, pollJob2);
        });
        for (PollJob<V> pollJob3 : this.pollJobs) {
            String str2 = (String) function.apply(pollJob3);
            boolean z = false;
            if (pollJob3.pollPeriod != null && pollJob3.pollPeriod.compareTo(Duration.ZERO) > 0) {
                ScheduledTask.Builder tag = ScheduledTask.builder(() -> {
                    return (Task) biFunction.apply(pollJob3.wrappedJob, str2);
                }).cancelOnException(false).tag(this.adjunct != null ? BrooklynTaskTags.tagForContextAdjunct(this.adjunct) : null);
                z = true;
                tag.displayName("Periodic: " + str2);
                tag.period(pollJob3.pollPeriod);
                if (duration == null || pollJob3.pollPeriod.isShorterThan(duration)) {
                    duration = pollJob3.pollPeriod;
                }
                ScheduledTask build = tag.build();
                this.scheduledTasks.add(build);
                log.debug("Submitting scheduled task " + build + " for poll/feed " + this + ", job " + pollJob3);
                Entities.submit(this.entity, build);
                newSetMultimap.removeAll(pollJob3.job);
            }
            if (pollJob3.pollTriggerSensor != null) {
                z = true;
                if (pollJob3.subscription != null) {
                    throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already has subscription %s", this, this.entity, pollJob3.subscription));
                }
                String name = pollJob3.pollTriggerSensor.getName();
                if (pollJob3.pollTriggerEntity != null && !pollJob3.pollTriggerEntity.equals(this.entity)) {
                    name = name + " on " + pollJob3.pollTriggerEntity;
                }
                log.debug("Adding subscription to " + name + " for poll/feed " + this + ", job " + pollJob3);
                of.add(name);
                pollJob3.subscription = this.adjunct.mo19subscriptions().subscribe(pollJob3.pollTriggerEntity != null ? pollJob3.pollTriggerEntity : this.adjunct.mo125getEntity(), pollJob3.pollTriggerSensor, sensorEvent -> {
                    try {
                        this.adjunct.getExecutionContext().submit((TaskAdaptable) biFunction.apply(pollJob3.wrappedJob, str2));
                    } catch (Exception e) {
                        throw Exceptions.propagate(e);
                    }
                });
            }
            if (!z && log.isDebugEnabled()) {
                log.debug("Empty poll job " + pollJob3 + " in " + this + " for " + this.entity + "; if all jobs are empty (or trigger only), will add a trivial one-time initial task");
            }
        }
        newSetMultimap.asMap().forEach((callable2, collection) -> {
            Runnable runnable2 = ((PollJob) collection.iterator().next()).wrappedJob;
            String str3 = (String) collection.stream().map(pollJob4 -> {
                return pollJob4.handler.getDescription();
            }).filter((v0) -> {
                return Strings.isNonBlank(v0);
            }).collect(Collectors.joining(", "));
            String str4 = (this.adjunct != null ? this.adjunct.getDisplayName() : "anonymous") + (Strings.isNonBlank(str3) ? "; " + str3 : "");
            Task build2 = Tasks.builder().dynamic(true).displayName("Initial: " + str4).body(() -> {
                return DynamicTasks.queue((TaskAdaptable) biFunction.apply(runnable2, str4)).getUnchecked();
            }).tag(this.adjunct != null ? BrooklynTaskTags.tagForContextAdjunct(this.adjunct) : null).build();
            log.debug("Submitting initial task " + build2 + " for poll/feed " + this + ", job " + runnable2 + " (because otherwise is trigger-only)");
            Entities.submit(this.entity, build2);
        });
        if (this.adjunct != null) {
            if (!of.isEmpty()) {
                if (duration == null) {
                    this.adjunct.highlightTriggers("Triggered by: " + Strings.join(of, "; "));
                    return;
                } else {
                    this.adjunct.highlightTriggers("Running every " + duration + " and on triggers: " + Strings.join(of, "; "));
                    return;
                }
            }
            if (duration == null || duration.equals(Duration.PRACTICALLY_FOREVER) || !duration.isPositive()) {
                this.adjunct.highlightTriggers("Not configured with a period or triggers");
            } else {
                highlightTriggerPeriod(duration);
            }
        }
    }

    void highlightTriggerPeriod(Duration duration) {
        this.adjunct.highlightTriggers("Running every " + duration);
    }

    public void stop() {
        if (log.isDebugEnabled()) {
            log.debug("Stopping poll for {} (using {})", new Object[]{this.entity, this});
        }
        if (!this.started) {
            throw new IllegalStateException(String.format("Attempt to stop poller %s of entity %s when not running", this, this.entity));
        }
        this.started = false;
        for (Task<?> task : this.oneOffTasks) {
            if (task != null) {
                task.cancel(true);
            }
        }
        for (ScheduledTask scheduledTask : this.scheduledTasks) {
            if (scheduledTask != null) {
                scheduledTask.cancel();
            }
        }
        for (PollJob<V> pollJob : this.pollJobs) {
            if (pollJob.subscription != null) {
                this.adjunct.mo19subscriptions().unsubscribe(pollJob.subscription);
                pollJob.subscription = null;
            }
        }
        this.oneOffTasks.clear();
        this.scheduledTasks.clear();
    }

    public boolean isRunning() {
        boolean z = false;
        Iterator<ScheduledTask> it = this.scheduledTasks.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ScheduledTask next = it.next();
            if (next.isBegun() && !next.isDone()) {
                z = true;
                break;
            }
        }
        boolean anyMatch = this.pollJobs.stream().anyMatch(pollJob -> {
            return pollJob.subscription != null;
        });
        if (!this.started && z) {
            log.warn("Poller should not be running, but has active tasks, tasks: " + this.scheduledTasks);
        }
        if (!this.started && anyMatch) {
            log.warn("Poller should not be running, but has subscriptions on jobs: " + this.pollJobs);
        }
        return this.started && (z || anyMatch);
    }

    protected boolean isEmpty() {
        return this.pollJobs.isEmpty();
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("entity", this.entity).toString();
    }
}
