package org.apache.qpid.server.store.berkeleydb;

import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.OperationFailureException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.ReplicationMutableConfig;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import com.sleepycat.je.rep.utilint.HostPortPair;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.State;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;

/* loaded from: input_file:qpid-bdbstore-0.20.jar:org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.class */
public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore {
    public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
    public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
    public static final String BDB_HA_STORE_TYPE = "BDB-HA";
    private String _groupName;
    private String _nodeName;
    private String _nodeHostPort;
    private String _helperHostPort;
    private Durability _durability;
    private String _name;
    private CommitThreadWrapper _commitThreadWrapper;
    private boolean _coalescingSync;
    private boolean _designatedPrimary;
    private Map<String, String> _repConfig;
    private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
    private static final Durability DEFAULT_DURABILITY = new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY);
    private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() { // from class: org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore.1
        {
            put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
            put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
            put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
            put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
            put("je.rep.protocolOldStringEncoding", Boolean.FALSE.toString());
            put(ReplicationMutableConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
        }
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qpid-bdbstore-0.20.jar:org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore$BDBHAMessageStoreStateChangeListener.class */
    public class BDBHAMessageStoreStateChangeListener implements StateChangeListener {
        private final Executor _executor;

        private BDBHAMessageStoreStateChangeListener() {
            this._executor = Executors.newSingleThreadExecutor();
        }

        @Override // com.sleepycat.je.rep.StateChangeListener
        public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException {
            ReplicatedEnvironment.State state = stateChangeEvent.getState();
            if (BDBHAMessageStore.LOGGER.isInfoEnabled()) {
                BDBHAMessageStore.LOGGER.info("Received BDB event indicating transition to state " + state);
            }
            switch (state) {
                case MASTER:
                    activateStoreAsync();
                    return;
                case REPLICA:
                    passivateStoreAsync();
                    return;
                case DETACHED:
                    BDBHAMessageStore.LOGGER.error("BDB replicated node in detached state, therefore passivating.");
                    passivateStoreAsync();
                    return;
                case UNKNOWN:
                    BDBHAMessageStore.LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)");
                    return;
                default:
                    BDBHAMessageStore.LOGGER.error("Unexpected state change: " + state);
                    throw new IllegalStateException("Unexpected state change: " + state);
            }
        }

        private void activateStoreAsync() {
            executeStateChangeAsync(new Callable<Void>() { // from class: org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore.BDBHAMessageStoreStateChangeListener.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    try {
                        BDBHAMessageStore.this.activate();
                        return null;
                    } catch (Exception e) {
                        BDBHAMessageStore.LOGGER.error("Failed to activate on hearing MASTER change event", e);
                        throw e;
                    }
                }
            }, "BDBHANodeActivationThread-" + BDBHAMessageStore.this._name);
        }

        private void passivateStoreAsync() {
            executeStateChangeAsync(new Callable<Void>() { // from class: org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore.BDBHAMessageStoreStateChangeListener.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    try {
                        BDBHAMessageStore.this.passivate();
                        return null;
                    } catch (Exception e) {
                        BDBHAMessageStore.LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event", e);
                        throw e;
                    }
                }
            }, "BDBHANodePassivationThread-" + BDBHAMessageStore.this._name);
        }

        private void executeStateChangeAsync(final Callable<Void> callable, final String str) {
            final RootMessageLogger rootMessageLogger = CurrentActor.get().getRootMessageLogger();
            this._executor.execute(new Runnable() { // from class: org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore.BDBHAMessageStoreStateChangeListener.3
                @Override // java.lang.Runnable
                public void run() {
                    String name = Thread.currentThread().getName();
                    Thread.currentThread().setName(str);
                    try {
                        CurrentActor.set(new AbstractActor(rootMessageLogger) { // from class: org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore.BDBHAMessageStoreStateChangeListener.3.1
                            public String getLogMessage() {
                                return str;
                            }
                        });
                        try {
                            callable.call();
                        } catch (Exception e) {
                            BDBHAMessageStore.LOGGER.error("Exception during state change", e);
                        }
                        Thread.currentThread().setName(name);
                    } catch (Throwable th) {
                        Thread.currentThread().setName(name);
                        throw th;
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qpid-bdbstore-0.20.jar:org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore$NoOpStateChangeListener.class */
    public class NoOpStateChangeListener implements StateChangeListener {
        private NoOpStateChangeListener() {
        }

        @Override // com.sleepycat.je.rep.StateChangeListener
        public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException {
        }
    }

    @Override // org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore
    public void configure(String str, Configuration configuration) throws Exception {
        this._groupName = getValidatedPropertyFromConfig("highAvailability.groupName", configuration);
        this._nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", configuration);
        this._nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", configuration);
        this._helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", configuration);
        this._name = str;
        String string = configuration.getString("highAvailability.durability");
        if (string == null) {
            this._durability = DEFAULT_DURABILITY;
        } else {
            this._durability = Durability.parse(string);
        }
        this._designatedPrimary = configuration.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE).booleanValue();
        this._coalescingSync = configuration.getBoolean("highAvailability.coalescingSync", Boolean.TRUE).booleanValue();
        this._repConfig = getConfigMap(REPCONFIG_DEFAULTS, configuration, "repConfig");
        if (this._coalescingSync && this._durability.getLocalSync() == Durability.SyncPolicy.SYNC) {
            throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + Durability.SyncPolicy.SYNC + "! Please set highAvailability.coalescingSync to false in store configuration.");
        }
        super.configure(str, configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore
    public void setupStore(File file, String str) throws DatabaseException, AMQStoreException {
        super.setupStore(file, str);
        if (this._coalescingSync) {
            this._commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + str, getEnvironment());
            this._commitThreadWrapper.startCommitThread();
        }
    }

    @Override // org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore
    protected Environment createEnvironment(File file) throws DatabaseException {
        ReplicatedEnvironment replicatedEnvironment;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Environment path " + file.getAbsolutePath());
            LOGGER.info("Group name " + this._groupName);
            LOGGER.info("Node name " + this._nodeName);
            LOGGER.info("Node host port " + this._nodeHostPort);
            LOGGER.info("Helper host port " + this._helperHostPort);
            LOGGER.info("Durability " + this._durability);
            LOGGER.info("Coalescing sync " + this._coalescingSync);
            LOGGER.info("Designated primary (applicable to 2 node case only) " + this._designatedPrimary);
        }
        ReplicationConfig replicationConfig = new ReplicationConfig(this._groupName, this._nodeName, this._nodeHostPort);
        replicationConfig.setHelperHosts(this._helperHostPort);
        replicationConfig.setDesignatedPrimary(this._designatedPrimary);
        setReplicationConfigProperties(replicationConfig);
        EnvironmentConfig createEnvironmentConfig = createEnvironmentConfig();
        createEnvironmentConfig.setDurability(this._durability);
        try {
            replicatedEnvironment = new ReplicatedEnvironment(file, replicationConfig, createEnvironmentConfig);
        } catch (InsufficientLogException e) {
            LOGGER.info("InsufficientLogException thrown and so full network restore required", e);
            NetworkRestore networkRestore = new NetworkRestore();
            NetworkRestoreConfig networkRestoreConfig = new NetworkRestoreConfig();
            networkRestoreConfig.setRetainLogFiles(false);
            networkRestore.execute(e, networkRestoreConfig);
            replicatedEnvironment = new ReplicatedEnvironment(file, replicationConfig, createEnvironmentConfig);
        }
        return replicatedEnvironment;
    }

    @Override // org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore
    public void configureMessageStore(String str, MessageStoreRecoveryHandler messageStoreRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler, Configuration configuration) throws Exception {
        super.configureMessageStore(str, messageStoreRecoveryHandler, transactionLogRecoveryHandler, configuration);
        getReplicatedEnvironment().setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
    }

    @Override // org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore
    public synchronized void activate() throws Exception {
        getEnvironment().flushLog(true);
        super.activate();
    }

    public synchronized void passivate() {
        if (this._stateManager.isNotInState(State.INITIALISED)) {
            LOGGER.debug("Store becoming passive");
            this._stateManager.attainState(State.INITIALISED);
        }
    }

    public String getName() {
        return this._name;
    }

    public String getGroupName() {
        return this._groupName;
    }

    public String getNodeName() {
        return this._nodeName;
    }

    public String getNodeHostPort() {
        return this._nodeHostPort;
    }

    public String getHelperHostPort() {
        return this._helperHostPort;
    }

    public String getDurability() {
        return this._durability.toString();
    }

    public boolean isCoalescingSync() {
        return this._coalescingSync;
    }

    public String getNodeState() {
        return getReplicatedEnvironment().getState().toString();
    }

    public Boolean isDesignatedPrimary() {
        return Boolean.valueOf(getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary());
    }

    public List<Map<String, String>> getGroupMembers() {
        ArrayList arrayList = new ArrayList();
        for (ReplicationNode replicationNode : getReplicatedEnvironment().getGroup().getNodes()) {
            HashMap hashMap = new HashMap();
            hashMap.put(GRP_MEM_COL_NODE_NAME, replicationNode.getName());
            hashMap.put(GRP_MEM_COL_NODE_HOST_PORT, replicationNode.getHostName() + HostPortPair.SEPARATOR + replicationNode.getPort());
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    public void removeNodeFromGroup(String str) throws AMQStoreException {
        try {
            createReplicationGroupAdmin().removeMember(str);
        } catch (OperationFailureException e) {
            throw new AMQStoreException("Failed to remove '" + str + "' from group. " + e.getMessage(), e);
        } catch (DatabaseException e2) {
            throw new AMQStoreException("Failed to remove '" + str + "' from group. " + e2.getMessage(), e2);
        }
    }

    public void setDesignatedPrimary(boolean z) throws AMQStoreException {
        try {
            ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment();
            synchronized (replicatedEnvironment) {
                replicatedEnvironment.setRepMutableConfig(replicatedEnvironment.getRepMutableConfig().setDesignatedPrimary(z));
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Node " + this._nodeName + " successfully set as designated primary for group");
            }
        } catch (DatabaseException e) {
            throw new AMQStoreException("Failed to set '" + this._nodeName + "' as designated primary for group. " + e.getMessage(), e);
        }
    }

    ReplicatedEnvironment getReplicatedEnvironment() {
        return (ReplicatedEnvironment) getEnvironment();
    }

    public void updateAddress(String str, String str2, int i) throws AMQStoreException {
        try {
            createReplicationGroupAdmin().updateAddress(str, str2, i);
        } catch (OperationFailureException e) {
            throw new AMQStoreException("Failed to update address for '" + str + "' with new host " + str2 + " and new port " + i + ". " + e.getMessage(), e);
        } catch (DatabaseException e2) {
            throw new AMQStoreException("Failed to update address for '" + str + "' with new host " + str2 + " and new port " + i + ". " + e2.getMessage(), e2);
        }
    }

    @Override // org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore
    protected StoreFuture commit(Transaction transaction, boolean z) throws DatabaseException {
        try {
            transaction.commit();
            return this._coalescingSync ? this._commitThreadWrapper.commit(transaction, z) : StoreFuture.IMMEDIATE_FUTURE;
        } catch (DatabaseException e) {
            LOGGER.error("Got DatabaseException on commit, closing environment", e);
            closeEnvironmentSafely();
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore
    public void closeInternal() throws Exception {
        substituteNoOpStateChangeListenerOn(getReplicatedEnvironment());
        try {
            if (this._coalescingSync) {
                this._commitThreadWrapper.stopCommitThread();
            }
        } finally {
            super.closeInternal();
        }
    }

    private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment) {
        LOGGER.debug("Substituting no-op state change listener for environment close");
        replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener());
    }

    private ReplicationGroupAdmin createReplicationGroupAdmin() {
        HashSet hashSet = new HashSet();
        hashSet.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets());
        ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig();
        hashSet.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
        return new ReplicationGroupAdmin(this._groupName, hashSet);
    }

    private void setReplicationConfigProperties(ReplicationConfig replicationConfig) {
        for (Map.Entry<String, String> entry : this._repConfig.entrySet()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Setting ReplicationConfig key " + entry.getKey() + " to '" + entry.getValue() + "'");
            }
            replicationConfig.setConfigParam(entry.getKey(), entry.getValue());
        }
    }

    private String getValidatedPropertyFromConfig(String str, Configuration configuration) throws ConfigurationException {
        if (configuration.containsKey(str)) {
            return configuration.getString(str);
        }
        throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " + str.replace('.', '/'));
    }

    public String getStoreType() {
        return BDB_HA_STORE_TYPE;
    }
}
