Classes Files

cs/MamaResourcePool.cs

Namespaces

Name
Wombat
System::Runtime::CompilerServices

Classes

  Name
class Wombat::MamaResourcePool
A MamaResourcePool is a convenience wrapper to allow easy subscription and management of OpenMAMA related resources

Source code

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

namespace Wombat
{
    public class MamaResourcePool
    {
        private static int MAMA_RESOURCE_POOL_DEFAULT_QUEUE_COUNT = 4;

        // Number of queues in use for the pool
        int mNumQueues = MAMA_RESOURCE_POOL_DEFAULT_QUEUE_COUNT;

        // Name of the pool
        string mName;

        // Implementation structures for the pool managed entities
        private Dictionary<string, MamaResourcePoolBridgeImpl> mPoolBridgeImpls = new Dictionary<string, MamaResourcePoolBridgeImpl>();    // Indexed by bridge name, e.g. "qpid"
        private Dictionary<string, MamaResourcePoolTransportImpl> mPoolTransportImpls = new Dictionary<string, MamaResourcePoolTransportImpl>(); // Indexed by transport name, e.g. "sub"
        private Dictionary<string, MamaResourcePoolSourceImpl> mPoolSourceImpls = new Dictionary<string, MamaResourcePoolSourceImpl>();    // Indexed by source name, e.g. "OM"

        // Default implementations to use
        string mDefaultTransportName;
        string mDefaultSourceName;

        // List of all created subscriptions for cleanup
        private HashSet<MamaResourcePoolSubscriptionImpl> mSubscriptions = new HashSet<MamaResourcePoolSubscriptionImpl>();

        // Cache a list of bridges which this resource pool knows about
        private List<String> mBridgeNames;

        // Regex queue / channel filter
        private MamaRegexChannelFilter mRegexChannelFilter = new MamaRegexChannelFilter();
        private bool mIsMamaOpened;

        // Subscription cleanup callback instance
        private DestroySubscriptionHandler mDestroySubscriptionHandler = new DestroySubscriptionHandler();

        private MamaResourcePoolOptionsImpl mDefaultOptions = new MamaResourcePoolOptionsImpl();

        private class MamaResourcePoolTransportImpl {
            public MamaResourcePoolBridgeImpl mMrpBridge;
            public MamaTransport mTransport;
        };

        private class MamaResourcePoolSourceImpl {
            public MamaSource mSource;
        };

        private class MamaResourcePoolSubscriptionImpl {
            public MamaSubscription mSubscription;
            public MamaResourcePoolSourceImpl mMrpSource;
        };

        private class DestroySubscriptionHandler : MamaQueueEventCallback {
            public void onEvent (MamaQueue queue, object closure) {
                MamaSubscription subscription = (MamaSubscription) closure;
                subscription.destroy();
            }
        }

        private class MamaResourcePoolOptionsImpl {
            public double mTimeout;
            public int mRetries;
            public mamaSubscriptionType mSubscriptionType;
            public bool mRequiresInitial;
            public MamaLogLevel mDebugLevel;
        }

        private class MamaResourcePoolBridgeImpl {
            public string mPoolName;
            public MamaQueueGroup mQueueGroup;
            public MamaBridge mBridge;

            public MamaResourcePoolBridgeImpl(MamaBridge bridge, int numQueues, String poolName) {
                mPoolName = poolName;
                mBridge = bridge;
                mQueueGroup = new MamaQueueGroup(mBridge, numQueues);
                string queuePrefix = Mama.getProperty(string.Format("mama.resource_pool.{0}.thread_name_prefix", mPoolName));
                if (null != queuePrefix) {
                    // Do the rounds with each queue in the queue group and name each thread
                    for (int i = 0; i < numQueues; i++) {
                        MamaQueue queue = mQueueGroup.getNextQueue();
                        String queueName = string.Format("{0}_{1}", queuePrefix, i);
                        queue.setQueueName(queueName);
                    }
                }
            }
        }
        
        private void applyOption(MamaResourcePoolOptionsImpl options, string key, string value) {
            Mama.log(MamaLogLevel.MAMA_LOG_LEVEL_FINEST, string.Format("Found pool option [{0}] = '{1}'", key, value));
            switch (key) {
                case "timeout":
                    options.mTimeout = int.Parse(value);
                    break;
                case "retries":
                    options.mRetries = int.Parse(value);
                    break;
                case "subscription_type":
                    switch (value) {
                        case "NORMAL":
                            options.mSubscriptionType = mamaSubscriptionType.MAMA_SUBSC_TYPE_NORMAL;
                            break;
                        case "GROUP":
                            options.mSubscriptionType = mamaSubscriptionType.MAMA_SUBSC_TYPE_GROUP;
                            break;
                        case "BOOK":
                            options.mSubscriptionType = mamaSubscriptionType.MAMA_SUBSC_TYPE_BOOK;
                            break;
                        case "BASIC":
                            options.mSubscriptionType = mamaSubscriptionType.MAMA_SUBSC_TYPE_BASIC;
                            break;
                        case "DICTIONARY":
                            options.mSubscriptionType = mamaSubscriptionType.MAMA_SUBSC_TYPE_DICTIONARY;
                            break;
                        case "SYMBOL_LIST":
                            options.mSubscriptionType = mamaSubscriptionType.MAMA_SUBSC_TYPE_SYMBOL_LIST;
                            break;
                        default:
                            Mama.log(MamaLogLevel.MAMA_LOG_LEVEL_WARN, string.Format(
                                    "Found unknown subscription_type: %s (must be NORMAL, GROUP, BOOK, BASIC, "
                                            + "DICTIONARY, SYMBOL_LIST, SYMBOL_LIST_NORMAL, SYMBOL_LIST_GROUP or SYMBOL_LIST_BOOK",
                                    value));
                            break;
                    }
                    break;
                case "requires_initial":
                    options.mRequiresInitial = bool.Parse(value);
                    break;
                case "debug_level":
                    options.mDebugLevel = Mama.GetLogLevelForString(value);
                    break;
                default:
                    Mama.log(MamaLogLevel.MAMA_LOG_LEVEL_WARN, string.Format(
                            "Found unrecognized mamaResourcePool option '{0}'", key));
                    break;
            }
        }

        private void applyOptionsFromConfiguration(MamaResourcePoolOptionsImpl options) {
            // Use mName
            string prefix = string.Format("mama.resource_pool.{0}.options.", mName);
            foreach (KeyValuePair<string, string> kv in Mama.getProperties()) {
                if (kv.Key.StartsWith(prefix)) {
                    applyOption(options, kv.Key.Substring(prefix.Length), kv.Value);
                }
            }
        }

        private void applyOptionsFromQueryString(String query, MamaResourcePoolOptionsImpl options) {
            if (null == query) {
                return;
            }

            // Prune opening question mark for this URI if it exists
            if (query.StartsWith("?"))
            {
                query = query.Substring(1);
            }

            string[] queryKeyValPairs = query.Split('&');
            foreach (string pair in queryKeyValPairs) {
                int index = pair.IndexOf('=');
                if (index > 0 && query.Length > index + 1) {
                    string key = pair.Substring(0, index);
                    string value = pair.Substring(index + 1);
                    applyOption(options, key, value);
                }
            }
        }

        private MamaResourcePoolSourceImpl findOrCreateSource (string name, MamaResourcePoolTransportImpl mrpTransportImpl) {
            MamaResourcePoolSourceImpl mrpSourceImpl;
            if (!mPoolSourceImpls.TryGetValue(name, out mrpSourceImpl)) {
                // There is no transport here - we need to create it
                MamaSource source = new MamaSource();
                source.transport = mrpTransportImpl.mTransport;
                source.symbolNamespace = name;

                mrpSourceImpl = new MamaResourcePoolSourceImpl();
                mrpSourceImpl.mSource = source;

                // Add it to the cache
                mPoolSourceImpls.Add(name, mrpSourceImpl);
            }
            return mrpSourceImpl;
        }

        private MamaResourcePoolTransportImpl findOrCreateTransport(string name) {
            MamaResourcePoolTransportImpl mrpTransportImpl;
            if (!mPoolTransportImpls.TryGetValue(name, out mrpTransportImpl)) {
                string bridgeName = findBridgeNameByTransportName(name);
                MamaResourcePoolBridgeImpl mrpBridgeImpl;
                if (!mPoolBridgeImpls.TryGetValue(bridgeName, out mrpBridgeImpl)) {
                    Mama.log(MamaLogLevel.MAMA_LOG_LEVEL_WARN,
                        string.Format("Could not find bridge {0}", bridgeName));
                    throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NOT_FOUND, "Could not find bridge");
                }

                // There is no transport here - we need to create it
                MamaTransport transport = new MamaTransport();
                transport.create(name, mrpBridgeImpl.mBridge);

                mrpTransportImpl = new MamaResourcePoolTransportImpl();
                mrpTransportImpl.mTransport = transport;
                mrpTransportImpl.mMrpBridge = mrpBridgeImpl;

                mPoolTransportImpls.Add(name, mrpTransportImpl);
            }
            return mrpTransportImpl;
        }
        
        private MamaResourcePoolSubscriptionImpl createSubscriptionFromComponents(
                MamaResourcePoolTransportImpl mrpTransportImpl,
                String sourceName,
                String topicName,
                MamaSubscriptionCallback callbacks,
                Object closure,
                MamaResourcePoolOptionsImpl mrpOptionsImpl) {
            MamaResourcePoolOptionsImpl subscriptionOptions;
            if (null == mrpOptionsImpl) {
                subscriptionOptions = mDefaultOptions;
            } else {
                subscriptionOptions = mrpOptionsImpl;
            }
            MamaSubscription newSubscription = new MamaSubscription();
            newSubscription.setTimeout(subscriptionOptions.mTimeout);
            newSubscription.setRetries (subscriptionOptions.mRetries);
            newSubscription.setSubscriptionType (subscriptionOptions.mSubscriptionType);
            newSubscription.setRequiresInitial (subscriptionOptions.mRequiresInitial);

            string fqTopic;
            if (null == sourceName) {
                fqTopic = topicName;
            } else {
                fqTopic = string.Format("{0}/{1}", sourceName, topicName);
            }

            MamaQueue queue;
            int queueIndex = mRegexChannelFilter.getChannel(fqTopic);
            if (queueIndex == int.MaxValue) {
                queue = mrpTransportImpl.mMrpBridge.mQueueGroup.getNextQueue();
            } else {
                queue = mrpTransportImpl.mMrpBridge.mQueueGroup.getQueueByIndex(queueIndex);
            }

            Mama.log(MamaLogLevel.MAMA_LOG_LEVEL_FINE, string.Format("MRP Creating subscription for {0}/{1}", sourceName, topicName));
            MamaResourcePoolSubscriptionImpl mrpSubscriptionImpl = new MamaResourcePoolSubscriptionImpl();
            MamaResourcePoolSourceImpl mrpSourceImpl = findOrCreateSource(sourceName, mrpTransportImpl);
            mrpSubscriptionImpl.mMrpSource = mrpSourceImpl;
            newSubscription.setup(
                    queue,
                    callbacks,
                    mrpSourceImpl.mSource,
                    topicName,
                    closure
            );
            mrpSubscriptionImpl.mSubscription = newSubscription;
            mSubscriptions.Add(mrpSubscriptionImpl);
            return mrpSubscriptionImpl;
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public void destroy()
        {
            // Stop dispatch on all the queues to prevent further callbacks
            foreach (KeyValuePair<string, MamaResourcePoolBridgeImpl> kv in mPoolBridgeImpls) {
                kv.Value.mQueueGroup.stop();
            }
    
            foreach (MamaResourcePoolSubscriptionImpl mrpSubscriptionImpl in mSubscriptions) {
                if (mrpSubscriptionImpl.mSubscription != null) {
                    mrpSubscriptionImpl.mSubscription.destroy();
                }
            }
            mSubscriptions.Clear();
    
            // Destroy all sources
            foreach (KeyValuePair<string, MamaResourcePoolSourceImpl> kv in mPoolSourceImpls) {
                kv.Value.mSource.destroy();
            }
            mPoolSourceImpls.Clear();
    
            // Destroy all queues
            foreach (KeyValuePair<string, MamaResourcePoolBridgeImpl> kv in mPoolBridgeImpls) {
                kv.Value.mQueueGroup.destroyWait();
            }
    
            // Destroy all transports
            foreach (KeyValuePair<string, MamaResourcePoolTransportImpl> kv in mPoolTransportImpls) {
                kv.Value.mTransport.destroy();
            }
            mPoolTransportImpls.Clear();
    
            // Clear down bridge list too
            mPoolBridgeImpls.Clear();
    
            // Close out MAMA if we ever got as far as opening it
            if (mIsMamaOpened) {
                Mama.close();
            }
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public MamaSubscription createSubscriptionFromUri (
                string uri,
                MamaSubscriptionCallback callbacks,
                object closure) {
            if (null == uri || null == callbacks)
            {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG, "Null arguments passed to createSubscriptionFromUri");
            }
            if (uri.Length == 0) {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_INVALID_ARG, "Zero length uri passed to createSubscriptionFromUri");
            }
            Uri parsedUri = new Uri(uri);
            string bridgeName = parsedUri.Scheme;
            if (null == bridgeName) {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NOT_FOUND, "Could not find valid bridge / scheme in createSubscriptionFromUri");
            }

            string transportName = parsedUri.Host;
            if (null == transportName) {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NOT_FOUND, "Could not find valid transport / host in createSubscriptionFromUri");
            }

            MamaResourcePoolBridgeImpl mrpBridgeImpl;
            if (!mPoolBridgeImpls.TryGetValue(bridgeName, out mrpBridgeImpl)) {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NOT_FOUND, "Could not find bridge loaded / configured for pool in createSubscriptionFromUri");
            }

            string parsedUriPath = parsedUri.AbsolutePath;
            string[] pathComponents = parsedUriPath.Split('/');
            if (pathComponents.Length < 2) {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_INVALID_ARG, "Could not find valid path in createSubscriptionFromUri");
            }

            // Find or create the transport in question
            MamaResourcePoolTransportImpl mrpTransportImpl = findOrCreateTransport(transportName);
            mrpTransportImpl.mMrpBridge = mrpBridgeImpl;

            // Take a copy of the default options for the pool
            MamaResourcePoolOptionsImpl subscriptionOptions = mDefaultOptions;

            // Apply the overrides from the query string
            applyOptionsFromQueryString(parsedUri.Query, subscriptionOptions);

            string topicName;
            string sourceName;
            if (pathComponents.Length == 2) {
                topicName     = pathComponents[1];
                sourceName    = null;
            } else {
                sourceName = pathComponents[1];
                topicName = pathComponents[2];
            }

            MamaResourcePoolSubscriptionImpl mrpSubscriptionImpl = createSubscriptionFromComponents (
                    mrpTransportImpl,
                    sourceName,
                    topicName,
                    callbacks,
                    closure,
                    subscriptionOptions
            );

            return mrpSubscriptionImpl.mSubscription;
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public MamaSubscription createSubscriptionFromComponents (
                string transportName,
                string sourceName,
                string topicName,
                MamaSubscriptionCallback callbacks,
                object closure)
        {
            if (null == transportName || null == sourceName ||
                    null == topicName || null == callbacks)
            {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG, "Null arguments passed to createSubscriptionFromComponents");
            }
            MamaResourcePoolTransportImpl mrpTransportImpl = findOrCreateTransport(transportName);
            MamaResourcePoolSubscriptionImpl mrpSubscriptionImpl = createSubscriptionFromComponents(
                    mrpTransportImpl,
                    sourceName,
                    topicName,
                    callbacks,
                    closure,
                    null);
            return mrpSubscriptionImpl.mSubscription;
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public MamaSubscription createSubscriptionFromTopicWithSource (
                string sourceName,
                string topicName,
                MamaSubscriptionCallback callbacks,
                object closure)
        {
            if (null == sourceName || null == topicName || null == callbacks)
            {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG, "Null arguments passed to createSubscriptionFromTopicWithSource");
            }

            string poolDefaultTransportName = Mama.getProperty(string.Format("mama.resource_pool.{0}.default_transport_sub", mName));
            string sourceDefaultTransportName = Mama.getProperty(string.Format("mama.source.{0}.transport_sub", sourceName));

            if (null == poolDefaultTransportName && null == sourceDefaultTransportName) {
                throw new MamaException(
                    MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG, 
                    string.Format(
                        "Could not create subscription for {0} - no default " +
                                "source or resource pool default transport defined for source " +
                                "{1} in resource pool {3]",
                        topicName, sourceName, mName));
            }

            return createSubscriptionFromComponents (
                    sourceDefaultTransportName != null ? sourceDefaultTransportName : poolDefaultTransportName,
                    sourceName,
                    topicName,
                    callbacks,
                    closure
            );
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public MamaSubscription createSubscriptionFromTopic (
                string topicName,
                MamaSubscriptionCallback callbacks,
                object closure)
        {
            if (null == topicName || null == callbacks)
            {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG, "Null arguments passed to createSubscriptionFromTopic");
            }
            String sourceName = Mama.getProperty(string.Format("mama.resource_pool.{0}.default_source_sub", mName));
            if (null == sourceName) {
                throw new MamaException(
                    MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG,
                    string.Format(
                        "Could not create subscription for {0} - no default_source_sub " +
                            "sub source defined for resource pool {1}",
                        topicName, mName));
            }

            string poolDefaultTransportName = Mama.getProperty(string.Format("mama.resource_pool.{0}.default_transport_sub", mName));
            string sourceDefaultTransportName = Mama.getProperty(string.Format("mama.source.{0}.transport_sub", sourceName));

            if (null == poolDefaultTransportName && null == sourceDefaultTransportName) {
                throw new MamaException(
                    MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG, 
                    string.Format(
                        "Could not create subscription for {0} - no default " +
                                "source or resource pool default transport defined for source " +
                                "{1} in resource pool {3]",
                        topicName, sourceName, mName));
            }
            return createSubscriptionFromComponents(
                    sourceDefaultTransportName != null ? sourceDefaultTransportName : poolDefaultTransportName,
                    sourceName,
                    topicName,
                    callbacks,
                    closure
            );
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public MamaTransport createTransportFromName (
            string transportName)
        {
            if (null == transportName)
            {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG,
                    "Null arguments passed to createTransportFromName");
            }
            return findOrCreateTransport(transportName).mTransport;
        }

        [MethodImpl(MethodImplOptions.Synchronized)]
        public void destroySubscription (MamaSubscription subscription)
        {
            if (null == subscription)
            {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG,
                    "Null arguments passed to destroySubscription");
            }

            MamaResourcePoolSubscriptionImpl existingMrpSubscription = null;
            foreach (MamaResourcePoolSubscriptionImpl mrpSubscription in mSubscriptions) {
                if (mrpSubscription.mSubscription == subscription) {
                    existingMrpSubscription = mrpSubscription;
                    break;
                }
            }
            if (null == existingMrpSubscription) {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NOT_FOUND,
                    "MAMA subscription cannot be destroyed since it could not be found in pool");
            }

            MamaQueue queue = subscription.subscQueue;
            queue.enqueueEvent(mDestroySubscriptionHandler, subscription);

            mSubscriptions.Remove(existingMrpSubscription);
        }

        private string findBridgeNameByTransportName (string name) {
            foreach (string bridgeName in mBridgeNames) {
                string prefix = string.Format("mama.{0}.transport.{1}.", bridgeName, name);
                foreach (KeyValuePair<String, String> entry in Mama.getProperties()) {
                    if (entry.Key.StartsWith(prefix)) {
                        return bridgeName;
                    }
                }
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NOT_FOUND,
                    string.Format("Could not find a bridge for transport {0}", name));
            }
            return null;
        }

        public MamaResourcePool (string name)
        {
            mIsMamaOpened = false;
            if (null == name)
            {
                throw new MamaException(MamaStatus.mamaStatus.MAMA_STATUS_NULL_ARG,
                    "Null string passed to MamaResourcePool");
            }
            mName = name;
            Mama.loadDefaultProperties();
            String bridgesProperty = Mama.getProperty (string.Format("mama.resource_pool.{0}.bridges", mName), "qpid");
            mBridgeNames = new List<string>(bridgesProperty.Split(','));
            if (mBridgeNames.Count == 0) {
                throw new MamaException(
                    MamaStatus.mamaStatus.MAMA_STATUS_NOT_FOUND,
                    "No bridges defined for this resource pool - cannot create");
            }
            mRegexChannelFilter.setDefaultChannel(int.MaxValue);
            mDefaultTransportName = Mama.getProperty (string.Format("mama.resource_pool.{0}.default_transport", mName));
            mDefaultSourceName = Mama.getProperty (string.Format("mama.resource_pool.{0}.default_source", mName));
            foreach (string bridgeName in mBridgeNames) {
                Mama.log(MamaLogLevel.MAMA_LOG_LEVEL_FINE, string.Format("MAMA Resource Pool found bridge {0} - will attempt to load", bridgeName));
                MamaBridge bridge = null;
                try {
                    bridge = Mama.loadBridge(bridgeName);
                } catch (MamaException e) {
                    Mama.log(MamaLogLevel.MAMA_LOG_LEVEL_ERROR,
                            string.Format("MAMA Resource Pool couldn't load bridge {0}: {1}", bridgeName, e.Message));
                }
                if (null != bridge) {
                    string numQueuesProperty = Mama.getProperty(string.Format("mama.resource_pool.%s.queues", mName));
                    if (null != numQueuesProperty) {
                        mNumQueues = int.Parse(numQueuesProperty);
                    } else {
                        mNumQueues = MAMA_RESOURCE_POOL_DEFAULT_QUEUE_COUNT;
                    }

                    mPoolBridgeImpls.Add(bridgeName, new MamaResourcePoolBridgeImpl(bridge, mNumQueues, mName));
                    for (int queueIdx = 0; queueIdx < mNumQueues; ++queueIdx) {
                        String regexStr = Mama.getProperty(string.Format("mama.resource_pool.%s.queue_%d.regex", mName, queueIdx));
                        if (null != regexStr) {
                            mRegexChannelFilter.addRegex(regexStr, queueIdx);
                        }
                    }
                } else {
                    Mama.log(MamaLogLevel.MAMA_LOG_LEVEL_WARN,
                            string.Format("MAMA Resource Pool bridge {0} failed to load - will not be available for use in this instance", bridgeName));
                }
            }
            Mama.open();
            mIsMamaOpened = true;

            mDefaultOptions.mSubscriptionType = mamaSubscriptionType.MAMA_SUBSC_TYPE_NORMAL;
            mDefaultOptions.mDebugLevel = MamaLogLevel.MAMA_LOG_LEVEL_OFF;
            mDefaultOptions.mRetries = 2;
            mDefaultOptions.mTimeout = 30;
            mDefaultOptions.mRequiresInitial = true;

            applyOptionsFromConfiguration(mDefaultOptions);
        }
    }
}

Updated on 2023-03-31 at 15:29:34 +0100