Classes Files

mama/MamaResourcePool.java

Namespaces

Name
com::wombat::mama
java::util

Classes

  Name
class com::wombat::mama::MamaResourcePool

Source code

package com.wombat.mama;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.logging.Level;

public class MamaResourcePool
{
    private static final int MAMA_RESOURCE_POOL_DEFAULT_QUEUE_COUNT = 4;

    // Number of queues in use for the pool
    int mNumQueues = MAMA_RESOURCE_POOL_DEFAULT_QUEUE_COUNT;

    // Name of the pool
    String mName;

    // Implementation structures for the pool managed entities
    private final Map<String, MamaResourcePoolBridgeImpl> mPoolBridgeImpls = new HashMap<>();    // Indexed by bridge name, e.g. "qpid"
    private final Map<String, MamaResourcePoolTransportImpl> mPoolTransportImpls = new HashMap<>(); // Indexed by transport name, e.g. "sub"
    private final Map<String, MamaResourcePoolSourceImpl> mPoolSourceImpls = new HashMap<>();    // Indexed by source name, e.g. "OM"

    // Default implementations to use
    String mDefaultTransportName;
    String mDefaultSourceName;

    // List of all created subscriptions for cleanup
    private final Set<MamaResourcePoolSubscriptionImpl> mSubscriptions = new HashSet<>();

    // Cache a list of bridges which this resource pool knows about
    private final List<String> mBridgeNames;

    // Regex queue / channel filter
    private final MamaRegexChannelFilter mRegexChannelFilter = new MamaRegexChannelFilter();
    private boolean mIsMamaOpened;

    // Subscription cleanup callback instance
    private final DestroySubscriptionHandler mDestroySubscriptionHandler = new DestroySubscriptionHandler();

    private final MamaResourcePoolOptionsImpl mDefaultOptions = new MamaResourcePoolOptionsImpl();

    private class MamaResourcePoolTransportImpl {
        MamaResourcePoolBridgeImpl mMrpBridge;
        MamaTransport mTransport;
    };

    private class MamaResourcePoolSourceImpl {
        MamaSource mSource;
    };

    private class MamaResourcePoolSubscriptionImpl {
        MamaSubscription mSubscription;
        MamaResourcePoolSourceImpl mMrpSource;
    };

    private class DestroySubscriptionHandler implements MamaQueueEventCallback {
        @Override
        public void onEvent (MamaQueue queue, Object closure) {
            MamaSubscription subscription = (MamaSubscription) closure;
            subscription.destroy();
        }
    }

    private class MamaResourcePoolOptionsImpl {
        double mTimeout;
        int mRetries;
        MamaSubscriptionType mSubscriptionType;
        boolean mRequiresInitial;
        MamaLogLevel mDebugLevel;
    }

    private class MamaResourcePoolBridgeImpl {
        private final String mPoolName;
        private final MamaQueueGroup mQueueGroup;
        private final MamaBridge mBridge;

        MamaResourcePoolBridgeImpl(MamaBridge bridge, int numQueues, String poolName) {
            mPoolName = poolName;
            mBridge = bridge;
            mQueueGroup = new MamaQueueGroup(numQueues, mBridge);
            String queuePrefix = Mama.getProperty(String.format("mama.resource_pool.%s.thread_name_prefix", mPoolName));
            if (null != queuePrefix) {
                // Do the rounds with each queue in the queue group and name each thread
                for (int i = 0; i < numQueues; i++) {
                    MamaQueue queue = mQueueGroup.getNextQueue();
                    String queueName = String.format("%s_%d", queuePrefix, i);
                    queue.setQueueName(queueName);
                }
            }
        }
    }

    public MamaResourcePool (final String name) throws MamaException
    {
        mIsMamaOpened = false;
        if (null == name)
        {
            throw new MamaException("Null string passed to MamaResourcePool");
        }
        Mama.loadDefaultProperties();
        mName = name;
        String bridgesProperty = Mama.getProperty (String.format("mama.resource_pool.%s.bridges", mName), "qpid");
        mBridgeNames = Arrays.asList(bridgesProperty.split(","));
        if (mBridgeNames.isEmpty()) {
            throw new MamaException("No bridges defined for this resource pool - cannot create");
        }
        mRegexChannelFilter.setDefaultChannel(Integer.MAX_VALUE);
        mDefaultTransportName = Mama.getProperty (String.format("mama.resource_pool.%s.default_transport", mName));
        mDefaultSourceName = Mama.getProperty (String.format("mama.resource_pool.%s.default_source", mName));
        for (String bridgeName : mBridgeNames) {
            Mama.log(MamaLogLevel.FINE, String.format("MAMA Resource Pool found bridge %s - will attempt to load", bridgeName));
            MamaBridge bridge = null;
            try {
                bridge = Mama.loadBridge(bridgeName);
            } catch (MamaException e) {
                Mama.log(MamaLogLevel.ERROR,
                        String.format("MAMA Resource Pool couldn't load bridge %s: %s", bridgeName, e.getMessage()));
            }
            if (null != bridge) {
                String numQueuesProperty = Mama.getProperty(String.format("mama.resource_pool.%s.queues", mName));
                if (null != numQueuesProperty) {
                    mNumQueues = Integer.parseInt(numQueuesProperty);
                } else {
                    mNumQueues = MAMA_RESOURCE_POOL_DEFAULT_QUEUE_COUNT;
                }

                mPoolBridgeImpls.put(bridgeName, new MamaResourcePoolBridgeImpl(bridge, mNumQueues, mName));
                for (int queueIdx = 0; queueIdx < mNumQueues; ++queueIdx) {
                    String regexStr = Mama.getProperty(String.format("mama.resource_pool.%s.queue_%d.regex", mName, queueIdx));
                    if (null != regexStr) {
                        mRegexChannelFilter.addRegex(regexStr, queueIdx);
                    }
                }
            } else {
                Mama.log(MamaLogLevel.WARN,
                        String.format("MAMA Resource Pool bridge %s failed to load - will not be available for use in this instance", bridgeName));
            }
        }
        Mama.open();
        mIsMamaOpened = true;

        mDefaultOptions.mSubscriptionType = MamaSubscriptionType.NORMAL;
        mDefaultOptions.mDebugLevel = MamaLogLevel.OFF;
        mDefaultOptions.mRetries = 2;
        mDefaultOptions.mTimeout = 30;
        mDefaultOptions.mRequiresInitial = true;

        applyOptionsFromConfiguration(mDefaultOptions);
    }

    private void applyOption(MamaResourcePoolOptionsImpl options, String key, String value) {
        Mama.log(MamaLogLevel.FINEST, String.format("Found pool option [%s] = '%s'", key, value));
        switch (key) {
            case "timeout":
                options.mTimeout = Integer.parseInt(value);
                break;
            case "retries":
                options.mRetries = Integer.parseInt(value);
                break;
            case "subscription_type":
                switch (value) {
                    case "NORMAL":
                        options.mSubscriptionType = MamaSubscriptionType.NORMAL;
                        break;
                    case "GROUP":
                        options.mSubscriptionType = MamaSubscriptionType.GROUP;
                        break;
                    case "BOOK":
                        options.mSubscriptionType = MamaSubscriptionType.BOOK;
                        break;
                    case "BASIC":
                        options.mSubscriptionType = MamaSubscriptionType.BASIC;
                        break;
                    case "DICTIONARY":
                        options.mSubscriptionType = MamaSubscriptionType.DICTIONARY;
                        break;
                    case "SYMBOL_LIST":
                        options.mSubscriptionType = MamaSubscriptionType.SYMBOL_LIST;
                        break;
                    case "SYMBOL_LIST_NORMAL":
                        options.mSubscriptionType = MamaSubscriptionType.SYMBOL_LIST_NORMAL;
                        break;
                    case "SYMBOL_LIST_GROUP":
                        options.mSubscriptionType = MamaSubscriptionType.SYMBOL_LIST_GROUP;
                        break;
                    case "SYMBOL_LIST_BOOK":
                        options.mSubscriptionType = MamaSubscriptionType.SYMBOL_LIST_BOOK;
                        break;
                    default:
                        Mama.log(MamaLogLevel.WARN, String.format(
                                "Found unknown subscription_type: %s (must be NORMAL, GROUP, BOOK, BASIC, "
                                        + "DICTIONARY, SYMBOL_LIST, SYMBOL_LIST_NORMAL, SYMBOL_LIST_GROUP or SYMBOL_LIST_BOOK",
                                value));
                        break;
                }
                break;
            case "requires_initial":
                options.mRequiresInitial = Boolean.parseBoolean(value);
                break;
            case "debug_level":
                options.mDebugLevel = MamaLogLevel.getMamaLogLevel(Level.parse(value));
                break;
            default:
                Mama.log(MamaLogLevel.WARN, String.format(
                        "Found unrecognized mamaResourcePool option '%s'",
                        key));
                break;
        }
    }

    private void applyOptionsFromConfiguration(MamaResourcePoolOptionsImpl options) {
        // Use mName
        String prefix = String.format("mama.resource_pool.%s.options.", mName);
        for (Map.Entry<String, String> entry : Mama.getProperties().entrySet()) {
            if (entry.getKey().startsWith(prefix)) {
                applyOption(options, entry.getKey().substring(prefix.length()), entry.getValue());
            }
        }
    }

    private void applyOptionsFromQueryString(String query, MamaResourcePoolOptionsImpl options) {
        if (null == query) {
            return;
        }

        // Prune opening question mark for this URI if it exists
        if (query.startsWith("?"))
        {
            query = query.substring(1);
        }

        for (String pair : query.split("&")) {
            int index = pair.indexOf("=");
            if (index > 0 && query.length() > index + 1) {
                String key = pair.substring(0, index);
                String value = pair.substring(index + 1);
                applyOption(options, key, value);
            }
        }
    }

    private MamaResourcePoolSourceImpl findOrCreateSource (String name, MamaResourcePoolTransportImpl mrpTransportImpl) {
        MamaResourcePoolSourceImpl mrpSourceImpl = mPoolSourceImpls.get(name);
        if (null == mrpSourceImpl) {
            // There is no transport here - we need to create it
            MamaSource source = new MamaSource();
            source.setTransport(mrpTransportImpl.mTransport);
            source.setSymbolNamespace(name);

            mrpSourceImpl = new MamaResourcePoolSourceImpl();
            mrpSourceImpl.mSource = source;

            // Add it to the cache
            mPoolSourceImpls.put(name, mrpSourceImpl);
        }
        return mrpSourceImpl;
    }

    private MamaResourcePoolTransportImpl findOrCreateTransport(String name) {
        MamaResourcePoolTransportImpl mrpTransportImpl = mPoolTransportImpls.get(name);
        if (null == mrpTransportImpl) {
            String bridgeName = findBridgeNameByTransportName(name);
            MamaResourcePoolBridgeImpl mrpBridgeImpl = mPoolBridgeImpls.get(bridgeName);
            if (mrpBridgeImpl == null) {
                Mama.log(MamaLogLevel.WARN,
                        String.format("Could not find bridge %s", bridgeName));
                throw new MamaException("Could not find bridge");
            }

            // There is no transport here - we need to create it
            MamaTransport transport = new MamaTransport();
            transport.create(name, mrpBridgeImpl.mBridge);

            mrpTransportImpl = new MamaResourcePoolTransportImpl();
            mrpTransportImpl.mTransport = transport;
            mrpTransportImpl.mMrpBridge = mrpBridgeImpl;

            mPoolTransportImpls.put(name, mrpTransportImpl);
        }
        return mrpTransportImpl;
    }

    private MamaResourcePoolSubscriptionImpl createSubscriptionFromComponents(
            MamaResourcePoolTransportImpl mrpTransportImpl,
            String sourceName,
            String topicName,
            MamaSubscriptionCallback callbacks,
            Object closure,
            MamaResourcePoolOptionsImpl mrpOptionsImpl) {
        MamaResourcePoolOptionsImpl subscriptionOptions;
        if (null == mrpOptionsImpl) {
            subscriptionOptions = mDefaultOptions;
        } else {
            subscriptionOptions = mrpOptionsImpl;
        }
        MamaSubscription newSubscription = new MamaSubscription();
        newSubscription.setTimeout(subscriptionOptions.mTimeout);
        newSubscription.setRetries (subscriptionOptions.mRetries);
        newSubscription.setSubscriptionType (subscriptionOptions.mSubscriptionType);
        newSubscription.setRequiresInitial (subscriptionOptions.mRequiresInitial);

        String fqTopic;
        if (null == sourceName) {
            fqTopic = topicName;
        } else {
            fqTopic = String.format("%s/%s", sourceName, topicName);
        }

        MamaQueue queue;
        int queueIndex = mRegexChannelFilter.getChannel(fqTopic);
        if (queueIndex == Integer.MAX_VALUE) {
            queue = mrpTransportImpl.mMrpBridge.mQueueGroup.getNextQueue();
        } else {
            queue = mrpTransportImpl.mMrpBridge.mQueueGroup.getQueueByIndex(queueIndex);
        }

        Mama.log(MamaLogLevel.FINE, String.format("MRP Creating subscription for %s/%s", sourceName, topicName));
        MamaResourcePoolSubscriptionImpl mrpSubscriptionImpl = new MamaResourcePoolSubscriptionImpl();
        MamaResourcePoolSourceImpl mrpSourceImpl = findOrCreateSource(sourceName, mrpTransportImpl);
        mrpSubscriptionImpl.mMrpSource = mrpSourceImpl;
        newSubscription.setupSubscription(
                callbacks,
                queue,
                mrpSourceImpl.mSource,
                topicName,
                closure
        );
        mrpSubscriptionImpl.mSubscription = newSubscription;
        mSubscriptions.add(mrpSubscriptionImpl);
        return mrpSubscriptionImpl;
    }

    public synchronized void destroy()
    {
        // Stop dispatch on all the queues to prevent further callbacks
        for (Map.Entry<String, MamaResourcePoolBridgeImpl> entry: mPoolBridgeImpls.entrySet()) {
            entry.getValue().mQueueGroup.stopDispatch();
        }

        for (MamaResourcePoolSubscriptionImpl mrpSubscriptionImpl : mSubscriptions) {
            if (mrpSubscriptionImpl.mSubscription != null) {
                mrpSubscriptionImpl.mSubscription.destroy();
            }
        }
        mSubscriptions.clear();

        // Destroy all sources
        for (Map.Entry<String, MamaResourcePoolSourceImpl> entry: mPoolSourceImpls.entrySet()) {
            entry.getValue().mSource.destroy();
        }
        mPoolSourceImpls.clear();

        // Destroy all queues
        for (Map.Entry<String, MamaResourcePoolBridgeImpl> entry: mPoolBridgeImpls.entrySet()) {
            entry.getValue().mQueueGroup.destroyWait();
        }

        // Destroy all transports
        for (Map.Entry<String, MamaResourcePoolTransportImpl> entry: mPoolTransportImpls.entrySet()) {
            entry.getValue().mTransport.destroy();
        }
        mPoolTransportImpls.clear();

        // Clear down bridge list too
        mPoolBridgeImpls.clear();

        // Close out MAMA if we ever got as far as opening it
        if (mIsMamaOpened) {
            Mama.close();
        }
    }

    public synchronized MamaSubscription createSubscriptionFromUri (
            final String uri,
            final MamaSubscriptionCallback callbacks,
            final Object closure) {
        if (null == uri || null == callbacks)
        {
            throw new MamaException("Null arguments passed to createSubscriptionFromUri");
        }
        if (uri.length() == 0) {
            throw new MamaException("Zero length uri passed to createSubscriptionFromUri");
        }
        URI parsedUri = URI.create(uri);
        String bridgeName = parsedUri.getScheme();
        if (null == bridgeName) {
            throw new MamaException("Could not find valid bridge / scheme in createSubscriptionFromUri");
        }

        String transportName = parsedUri.getHost();
        if (null == transportName) {
            throw new MamaException("Could not find valid transport / host in createSubscriptionFromUri");
        }

        MamaResourcePoolBridgeImpl mrpBridgeImpl = mPoolBridgeImpls.get(bridgeName);
        if (null == mrpBridgeImpl) {
            throw new MamaException("Could not find bridge loaded / configured for pool in createSubscriptionFromUri");
        }

        String parsedUriPath = parsedUri.getPath();
        String[] pathComponents = parsedUriPath.split("/");
        if (pathComponents.length < 2) {
            throw new MamaException("Could not find valid path in createSubscriptionFromUri");
        }


        // Find or create the transport in question
        MamaResourcePoolTransportImpl mrpTransportImpl = findOrCreateTransport(transportName);
        mrpTransportImpl.mMrpBridge = mrpBridgeImpl;

        // Take a copy of the default options for the pool
        MamaResourcePoolOptionsImpl subscriptionOptions = mDefaultOptions;

        // Apply the overrides from the query string
        applyOptionsFromQueryString(parsedUri.getRawQuery(), subscriptionOptions);

        String topicName;
        String sourceName;
        if (pathComponents.length == 2) {
            topicName     = pathComponents[1];
            sourceName    = null;
        } else {
            sourceName = pathComponents[1];
            topicName = pathComponents[2];
        }

        MamaResourcePoolSubscriptionImpl mrpSubscriptionImpl = createSubscriptionFromComponents (
                mrpTransportImpl,
                sourceName,
                topicName,
                callbacks,
                closure,
                subscriptionOptions
        );

        return mrpSubscriptionImpl.mSubscription;
    }

    public synchronized MamaSubscription createSubscriptionFromComponents (
            final String transportName,
            final String sourceName,
            final String topicName,
            final MamaSubscriptionCallback callbacks,
            final Object closure)
    {
        if (null == transportName || null == sourceName ||
                null == topicName || null == callbacks)
        {
            throw new MamaException("Null arguments passed to createSubscriptionFromComponents");
        }
        MamaResourcePoolTransportImpl mrpTransportImpl = findOrCreateTransport(transportName);
        MamaResourcePoolSubscriptionImpl mrpSubscriptionImpl = createSubscriptionFromComponents(
                mrpTransportImpl,
                sourceName,
                topicName,
                callbacks,
                closure,
                null);
        return mrpSubscriptionImpl.mSubscription;
    }

    public synchronized MamaSubscription createSubscriptionFromTopicWithSource (
            final String sourceName,
            final String topicName,
            final MamaSubscriptionCallback callbacks,
            final Object closure)
    {
        if (null == sourceName || null == topicName || null == callbacks)
        {
            throw new MamaException("Null arguments passed to createSubscriptionFromTopicWithSource");
        }

        String poolDefaultTransportName = Mama.getProperty(String.format("mama.resource_pool.%s.default_transport_sub", mName));
        String sourceDefaultTransportName = Mama.getProperty(String.format("mama.source.%s.transport_sub", sourceName));

        if (null == poolDefaultTransportName && null == sourceDefaultTransportName) {
            throw new MamaException(String.format(
                    "Could not create subscription for %s - no default " +
                            "source or resource pool default transport defined for source " +
                            "%s in resource pool %s",
                    topicName, sourceName, mName));
        }

        return createSubscriptionFromComponents (
                sourceDefaultTransportName != null ? sourceDefaultTransportName : poolDefaultTransportName,
                sourceName,
                topicName,
                callbacks,
                closure
        );
    }

    public synchronized MamaSubscription createSubscriptionFromTopic (
            final String topicName,
            final MamaSubscriptionCallback callbacks,
            final Object closure)
    {
        if (null == topicName || null == callbacks)
        {
            throw new MamaException("Null arguments passed to createSubscriptionFromTopic");
        }
        String sourceName = Mama.getProperty(String.format("mama.resource_pool.%s.default_source_sub", mName));
        if (null == sourceName) {
            throw new MamaException(String.format(
                    "Could not create subscription for %s - no default_source_sub " +
                        "sub source defined for resource pool %s",
                    topicName, mName));
        }

        String poolDefaultTransportName = Mama.getProperty(String.format("mama.resource_pool.%s.default_transport_sub", mName));
        String sourceDefaultTransportName = Mama.getProperty(String.format("mama.source.%s.transport_sub", sourceName));

        if (null == poolDefaultTransportName && null == sourceDefaultTransportName) {
            throw new MamaException(String.format(
                    "Could not create subscription for %s - no default " +
                            "source or resource pool default transport defined for source " +
                            "%s in resource pool %s",
                    topicName, sourceName, mName));
        }
        return createSubscriptionFromComponents(
                sourceDefaultTransportName != null ? sourceDefaultTransportName : poolDefaultTransportName,
                sourceName,
                topicName,
                callbacks,
                closure
        );
    }

    public synchronized MamaTransport createTransportFromName (
            final String transportName)
    {
        if (null == transportName)
        {
            throw new MamaException("Null arguments passed to createTransportFromName");
        }
        return findOrCreateTransport(transportName).mTransport;
    }

    public synchronized void destroySubscription (
            final MamaSubscription subscription)
    {
        if (null == subscription)
        {
            throw new MamaException("Null arguments passed to destroySubscription");
        }

        MamaResourcePoolSubscriptionImpl existingMrpSubscription = null;
        for (MamaResourcePoolSubscriptionImpl mrpSubscription : mSubscriptions) {
            if (mrpSubscription.mSubscription == subscription) {
                existingMrpSubscription = mrpSubscription;
                break;
            }
        }
        if (null == existingMrpSubscription) {
            throw new MamaException("MAMA subscription cannot be destroyed since it could not be found in pool");
        }

        MamaQueue queue = subscription.getQueue();
        queue.enqueueEvent(mDestroySubscriptionHandler, subscription);

        mSubscriptions.remove(existingMrpSubscription);
    }

    private String findBridgeNameByTransportName (String name) {
        for (String bridgeName : mBridgeNames) {
           String prefix = String.format("mama.%s.transport.%s.", bridgeName, name);
            for (Map.Entry<String, String> entry : Mama.getProperties().entrySet()) {
                if (entry.getKey().startsWith(prefix)) {
                    return bridgeName;
                }
            }
            throw new MamaException(String.format("Could not find a bridge for transport %s", name));
        }
        return null;
    }
}

Updated on 2023-03-31 at 15:29:46 +0100