package org.apache.brooklyn.entity.nosql.mongodb.sharding;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.entity.group.DynamicClusterImpl;
import org.apache.brooklyn.entity.nosql.mongodb.MongoDBClientSupport;
import org.apache.brooklyn.entity.nosql.mongodb.MongoDBReplicaSet;
import org.apache.brooklyn.entity.nosql.mongodb.MongoDBServer;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.class */
public class MongoDBShardClusterImpl extends DynamicClusterImpl implements MongoDBShardCluster {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBShardClusterImpl.class);
    private Set<Entity> addedMembers = Sets.newConcurrentHashSet();
    private Set<Entity> addingMembers = Sets.newConcurrentHashSet();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    protected EntitySpec<?> getMemberSpec() {
        EntitySpec<?> memberSpec = super.getMemberSpec();
        if (memberSpec == null) {
            memberSpec = EntitySpec.create(MongoDBReplicaSet.class);
        }
        memberSpec.configure(DynamicClusterImpl.INITIAL_SIZE, getConfig(MongoDBShardedDeployment.SHARD_REPLICASET_SIZE));
        return memberSpec;
    }

    public void start(Collection<? extends Location> collection) {
        subscriptions().subscribeToMembers(this, Startable.SERVICE_UP, new SensorEventListener<Boolean>() { // from class: org.apache.brooklyn.entity.nosql.mongodb.sharding.MongoDBShardClusterImpl.1
            public void onEvent(SensorEvent<Boolean> sensorEvent) {
                MongoDBShardClusterImpl.this.addShards();
            }
        });
        super.start(collection);
        subscriptions().subscribe((MongoDBRouterCluster) getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER), MongoDBRouterCluster.ANY_RUNNING_ROUTER, new SensorEventListener<MongoDBRouter>() { // from class: org.apache.brooklyn.entity.nosql.mongodb.sharding.MongoDBShardClusterImpl.2
            public void onEvent(SensorEvent<MongoDBRouter> sensorEvent) {
                if (sensorEvent.getValue() != null) {
                    MongoDBShardClusterImpl.this.addShards();
                }
            }
        });
    }

    public void stop() {
        this.executor.shutdownNow();
        super.stop();
    }

    public void onManagementStopped() {
        super.onManagementStopped();
        this.executor.shutdownNow();
    }

    protected void addShards() {
        if (((MongoDBRouter) ((MongoDBRouterCluster) getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER)).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER)) == null) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Not adding shards because no running router in {}", this);
                return;
            }
            return;
        }
        for (Entity entity : getMembers()) {
            if (((Boolean) entity.getAttribute(Startable.SERVICE_UP)).booleanValue() && !this.addingMembers.contains(entity)) {
                LOG.info("{} adding shard {}", new Object[]{this, entity});
                this.addingMembers.add(entity);
                addShardAsync(entity);
            }
        }
    }

    protected void addShardAsync(final Entity entity) {
        final Duration minutes = Duration.minutes(20);
        final Stopwatch createStarted = Stopwatch.createStarted();
        final AtomicInteger atomicInteger = new AtomicInteger();
        this.executor.submit(new Runnable() { // from class: org.apache.brooklyn.entity.nosql.mongodb.sharding.MongoDBShardClusterImpl.3
            @Override // java.lang.Runnable
            public void run() {
                boolean z;
                MongoDBRouter mongoDBRouter = (MongoDBRouter) ((MongoDBRouterCluster) MongoDBShardClusterImpl.this.getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER)).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
                if (mongoDBRouter == null) {
                    MongoDBShardClusterImpl.LOG.debug("Rescheduling adding shard {} because no running router for cluster {}", entity, this);
                    z = true;
                } else {
                    try {
                        MongoDBClientSupport forServer = MongoDBClientSupport.forServer(mongoDBRouter);
                        try {
                            MongoDBServer mongoDBServer = (MongoDBServer) entity.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY);
                            if (mongoDBServer != null) {
                                String str = entity.getName() + "/" + String.format("%s:%d", mongoDBServer.getAttribute(MongoDBServer.SUBNET_HOSTNAME), mongoDBServer.getAttribute(MongoDBServer.PORT));
                                if (forServer.addShardToRouter(str)) {
                                    MongoDBShardClusterImpl.LOG.info("{} added shard {} via {}", new Object[]{MongoDBShardClusterImpl.this, str, mongoDBRouter});
                                    MongoDBShardClusterImpl.this.addedMembers.add(entity);
                                    z = false;
                                } else {
                                    MongoDBShardClusterImpl.LOG.debug("Rescheduling addition of shard {} because add failed via router {}", str, mongoDBRouter);
                                    z = true;
                                }
                            } else {
                                MongoDBShardClusterImpl.LOG.debug("Rescheduling addition of shard {} because primary is null", entity);
                                z = true;
                            }
                        } catch (Exception e) {
                            MongoDBShardClusterImpl.LOG.error("Failed to add shard to router {}:  ", mongoDBRouter, e);
                            throw Exceptions.propagate(e);
                        }
                    } catch (UnknownHostException e2) {
                        throw Exceptions.propagate(e2);
                    }
                }
                if (z) {
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    if (incrementAndGet > 1 && minutes.toMilliseconds() > createStarted.elapsed(TimeUnit.MILLISECONDS)) {
                        MongoDBShardClusterImpl.this.executor.schedule(this, 3L, TimeUnit.SECONDS);
                    } else {
                        MongoDBShardClusterImpl.LOG.warn("Timeout after {} attempts ({}) adding shard {}; aborting", new Object[]{Integer.valueOf(incrementAndGet), Time.makeTimeStringRounded(createStarted), entity});
                        MongoDBShardClusterImpl.this.addingMembers.remove(entity);
                    }
                }
            }
        });
    }
}
