Classes Files

cs/MamaQueueGroup.cs

Namespaces

Name
Wombat

Classes

  Name
class Wombat::MamaQueueGroup
A class for distributing events across multiple queues in a round robin fashion.

Source code

/* $Id$
 *
 * OpenMAMA: The open middleware agnostic messaging API
 * Copyright (C) 2011 NYSE Technologies, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301 USA
 */

using System;
using System.Threading;

namespace Wombat
{
    /* ************************************************************** */
    #region MamaQueueGroup Class

    public class MamaQueueGroup : IDisposable
    {
        /* ************************************************************** */
        #region Class Member Definition

        /* ************************************************************** */
        #region Private Member Variables

        private volatile int mNextQueueIndex;

        private MamaQueueThread[] mQueueThreads;

        #endregion

        /* ************************************************************** */
        #region Nested Classes

        private class MamaQueueThread : IDisposable
        {
            /* ************************************************************** */
            #region Class Member Definitino

            /* ************************************************************** */
            #region Private Member Variables

            private MamaQueue mQueue;

            private Thread mWorkerThread;

            #endregion

            #endregion

            /* ************************************************************** */
            #region Construction and Finalization

            public MamaQueueThread(MamaBridge bridge)
            {
                // Create the queue
                mQueue = new MamaQueue(bridge);

                // Start dispatching straight away
                start();
            }

            ~MamaQueueThread()
            {
                // Dispose only unmanaged resources
                Dispose(false);
            }

            #endregion

            /* ************************************************************** */
            #region Internal Operations

            internal void destroyWait()
            {
                // Stop dispatching the queue, this will not return until the thread has shut down
                stop();

                // Destroy the queue, but wait until it has shutdown
                mQueue.destroyWait();
                mQueue = null;

                // Supress the finalizer as we have effectively disposed the class
                GC.SuppressFinalize(this);
            }

            internal MamaQueue Queue
            {
                get
                {
                    return mQueue;
                }
            }

            internal void start()
            {
                // Only continue if the thread is not valid
                if (null == mWorkerThread)
                {
                    // Create the thread
                    mWorkerThread = new Thread(new ThreadStart(this.workerThreadProc));

                    // It will run in the background
                    mWorkerThread.IsBackground = true;

                    // Format the name with the queue hash code
                    mWorkerThread.Name = string.Format("MamaQueueManager: {0}", mQueue.GetHashCode());

                    // Start the thread
                    mWorkerThread.Start();
                }
            }

            internal void stop()
            {
                // Only continue if there is a thread dispatching the queue
                if (null != mWorkerThread)
                {
                    // Stop dispatching on the queue, this will release the worker thread
                    mQueue.stopDispatch();

                    // Wait until the thread exits
                    mWorkerThread.Join();

                    // Clear the member variable so the thread can be recreated if dispatching is restarted
                    mWorkerThread = null;
                }
            }

            #endregion

            /* ************************************************************** */
            #region Private Operations

            private void Dispose(bool disposing)
            {
                // Dispose managed objects
                if (disposing)
                {
                    // Stop dispatching the queue, this will not return until the thread has shut down
                    stop();

                    // Destroy the queue
                    mQueue.Dispose();
                    mQueue = null;
                }
            }

            private void workerThreadProc()
            {
                // Start dispatching on the queue
                mQueue.dispatch();
            }

            #endregion

            /* ************************************************************** */
            #region Public Operations

            public void Dispose()
            {
                // Dispose managed and unmanaged resources
                Dispose(true);

                // This object will be cleaned up by the Dispose method.
                // Therefore, you should call GC.SupressFinalize to
                // take this object off the finalization queue
                // and prevent finalization code for this object
                // from executing a second time.
                GC.SuppressFinalize(this);
            }

            #endregion
        }

        #endregion

        #endregion

        /* ************************************************************** */
        #region Construction and Finalization

        public MamaQueueGroup(MamaBridge bridgeImpl, int queueCount)
        {
            // Check arguments
            if (queueCount < 1)
            {
                throw new ArgumentOutOfRangeException("queueCount", queueCount, "Queue count should be > 0");
            }

            // Create the array of queue threads
            mQueueThreads = new MamaQueueThread[queueCount];
            for (int nextQueue = 0; nextQueue<queueCount; nextQueue++)
            {
                mQueueThreads[nextQueue] = new MamaQueueThread(bridgeImpl);
            }
        }

        ~MamaQueueGroup()
        {
            // Dispose only unmanaged resources
            Dispose(false);
        }

        #endregion

        /* ************************************************************** */
        #region Private Operations

        private void Dispose(bool disposing)
        {
            // If disposing is true then dispose all managed resources
            if (disposing)
            {
                // Lock to prevent thread race conditions
                lock (this)
                {
                    // Enumerate all the threads
                    foreach (MamaQueueThread queueThread in mQueueThreads)
                    {
                        // Dispose the queue
                        queueThread.Dispose();
                    }

                    // Clear the array
                    mQueueThreads = null;
                }
            }

            // Clean up unamanged resources regardless
        }

        #endregion

        /* ************************************************************** */
        #region Public Operations

        public void destroy()
        {
            Dispose();
        }

        public void destroyWait()
        {
            // Lock to prevent race conditions
            lock (this)
            {
                // Destroy all the queue threads
                foreach (MamaQueueThread queueThread in mQueueThreads)
                {
                    queueThread.destroyWait();
                }

                // Clear the array
                mQueueThreads = null;
            }

            // The class has been disposed, prevent any further finalization
            GC.SuppressFinalize(this);
        }

        public void Dispose()
        {
            // Dispose managed and unmanaged resources
            Dispose(true);

            // This object will be cleaned up by the Dispose method.
            // Therefore, you should call GC.SupressFinalize to
            // take this object off the finalization queue
            // and prevent finalization code for this object
            // from executing a second time.
            GC.SuppressFinalize(this);
        }

        public void start()
        {
            // Lock to prevent race conditions
            lock (this)
            {
                // Start all the queue threads
                foreach (MamaQueueThread queueThread in mQueueThreads)
                {
                    queueThread.start();
                }
            }
        }

        public void stop()
        {
            // Lock to prevent race conditions
            lock (this)
            {
                // Stop all the queue threads
                foreach (MamaQueueThread queueThread in mQueueThreads)
                {
                    queueThread.stop();
                }
            }
        }

        public MamaQueue getNextQueue()
        {
            // Returns
            MamaQueue ret = null;

            lock(this)
            {
                // Only continue if the array of threads is valid
                if (mQueueThreads != null)
                {
                    // If the queue index has reached the end of the array then go back to the start
                    if (mNextQueueIndex >= mQueueThreads.Length)
                    {
                        mNextQueueIndex = 0;
                    }

                    // Get the queue at this index and increment.
                    ret = mQueueThreads[mNextQueueIndex++].Queue;
                }
            }

            return ret;
        }

        public MamaQueue getQueueByIndex(int index)
        {
            // Returns
            MamaQueue ret = null;

            lock(this)
            {
                // Only continue if the array of threads is valid
                if (mQueueThreads != null)
                {
                    // Get the queue at this index and increment.
                    ret = mQueueThreads[index % mQueueThreads.Length].Queue;
                }
            }

            return ret;
        }

        #endregion
    }

    #endregion
}

Updated on 2023-03-31 at 15:29:34 +0100