Name |
---|
Wombat |
Name | |
---|---|
class | Wombat::MamaQueue MamaQueues allow applications to dispatch events in order with multiple threads. |
/* $Id$
*
* OpenMAMA: The open middleware agnostic messaging API
* Copyright (C) 2011 NYSE Technologies, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/
using System;
using System.Collections;
using System.Runtime.InteropServices;
using System.Threading;
using System.Collections.Generic;
namespace Wombat
{
public class MamaQueue : MamaWrapper
{
private enum QueueState : byte
{
Stopped,
Stopping,
Running
};
QueueState status = QueueState.Stopped;
public MamaQueue (MamaBridge bridgeImpl) : base()
{
int code = NativeMethods.mamaQueue_create(ref nativeHandle, bridgeImpl.NativeHandle);
CheckResultCode(code);
}
internal MamaQueue(IntPtr nativeHandle) : base(nativeHandle)
{
}
public void setHighWatermark(int highWatermark)
{
int code = NativeMethods.mamaQueue_setHighWatermark(nativeHandle, highWatermark);
CheckResultCode(code);
}
public int getHighWatermark()
{
int result = 0;
int code = NativeMethods.mamaQueue_getHighWatermark(nativeHandle, ref result);
CheckResultCode(code);
return result;
}
public void setLowWatermark(int lowWatermark)
{
int code = NativeMethods.mamaQueue_setLowWatermark(nativeHandle, lowWatermark);
CheckResultCode(code);
}
public void setQueueName(string queueName)
{
int code = NativeMethods.mamaQueue_setQueueName(nativeHandle, queueName);
CheckResultCode(code);
}
public int getLowWatermark()
{
int result = 0;
int code = NativeMethods.mamaQueue_getLowWatermark(nativeHandle, ref result);
CheckResultCode(code);
return result;
}
public int getEventCount()
{
int result = 0;
int code = NativeMethods.mamaQueue_getEventCount(nativeHandle, ref result);
CheckResultCode(code);
return result;
}
public string getQueueBridgeName()
{
IntPtr ret = IntPtr.Zero;
int code = NativeMethods.mamaQueue_getQueueBridgeName(nativeHandle, ref ret);
CheckResultCode(code);
return Marshal.PtrToStringAnsi(ret);
}
public void setQueueMonitorCallbacks(MamaQueueMonitorCallback callback, object closure)
{
EnsurePeerCreated();
mWatermarkCallbackForwarder = new WatermarkCallbackForwarder(this, callback, closure);
mWatermarkShimCallbacks.mHighWatermarkExceededShimCallback = new WatermarkCallbackForwarder.HighWatermarkExceededCallback(mWatermarkCallbackForwarder.HighWatermarkExceeded);
mWatermarkShimCallbacks.mLowWatermarkShimCallback = new WatermarkCallbackForwarder.LowWatermarkCallback(mWatermarkCallbackForwarder.LowWatermark);
int code = NativeMethods.mamaQueue_setQueueMonitorCallbacks(nativeHandle, ref mWatermarkShimCallbacks, IntPtr.Zero);
CheckResultCode(code);
}
public void setEnqueueCallback(MamaQueueEnqueueCallback callback)
{
EnsurePeerCreated();
mEnqueueCallbackForwarder = new EnqueueCallbackForwarder(this, callback);
mEnqueueShimCallback = new EnqueueCallbackForwarder.EnqueueCallback(mEnqueueCallbackForwarder.OnEnqueue);
int code = NativeMethods.mamaQueue_setEnqueueCallback(nativeHandle, mEnqueueShimCallback, IntPtr.Zero);
CheckResultCode(code);
}
public void enqueueEvent(MamaQueueEventCallback callback, object closure)
{
// Verify that the callback object has been supplied
if(callback == null)
{
throw new ArgumentNullException("callback");
}
// Make sure the queue has been created
EnsurePeerCreated();
// Create a new forwarder object to manage the .Net callback objects
int code = EnqueueEventForwarder.ForwardEvent(callback, closure, nativeHandle, this);
// Chek that the return code
CheckResultCode(code);
}
public bool canDestroy()
{
// Returns
bool ret = false;
// Verify that the underlying C object has been created
EnsurePeerCreated();
// Call the native method
int mqcd = NativeMethods.mamaQueue_canDestroy(nativeHandle);
// MAMA_STATUS_OK is returned by the C layer if the queue can be destroyed
if ((int)MamaStatus.mamaStatus.MAMA_STATUS_OK == mqcd)
{
ret = true;
}
return ret;
}
public void destroy()
{
// Verify that the native queue is valid
EnsurePeerCreated();
// If messages are being dispatched stop now
if (status == QueueState.Running)
{
stopDispatch();
}
// Destroy the queue
CheckResultCode(NativeMethods.mamaQueue_destroy(nativeHandle));
// The native queue will not have been destroyed, reset the member variable
nativeHandle = IntPtr.Zero;
}
public void destroyWait()
{
// Verify that the native queue is valid
EnsurePeerCreated();
// If messages are being dispatched stop now
if (status == QueueState.Running)
{
stopDispatch();
}
// Destroy the queue
CheckResultCode(NativeMethods.mamaQueue_destroyWait(nativeHandle));
// The native queue will not have been destroyed, reset the member variable
nativeHandle = IntPtr.Zero;
}
public void destroyTimedWait(long timeout)
{
// Verify that the native queue is valid
EnsurePeerCreated();
// If messages are being dispatched stop now
if (status == QueueState.Running)
{
stopDispatch();
}
// Destroy the queue
CheckResultCode(NativeMethods.mamaQueue_destroyTimedWait(nativeHandle, timeout));
// The native queue will not have been destroyed, reset the member variable
nativeHandle = IntPtr.Zero;
}
protected override MamaStatus.mamaStatus DestroyNativePeer()
{
// Returns
MamaStatus.mamaStatus ret = MamaStatus.mamaStatus.MAMA_STATUS_OK;
// Only continue if the native queue is valid
if (nativeHandle != IntPtr.Zero)
{
ret = (MamaStatus.mamaStatus)NativeMethods.mamaQueue_destroy(nativeHandle);
}
return ret;
}
public void dispatch ()
{
status = QueueState.Running;
EnsurePeerCreated();
int code = NativeMethods.mamaQueue_dispatch(nativeHandle);
CheckResultCode(code);
status = QueueState.Stopped;
}
public void dispatchEvent ()
{
status = QueueState.Running;
EnsurePeerCreated();
int code = NativeMethods.mamaQueue_dispatchEvent(nativeHandle);
CheckResultCode(code);
status = QueueState.Stopped;
}
public void stopDispatch()
{
status = QueueState.Stopping;
EnsurePeerCreated();
int code = NativeMethods.mamaQueue_stopDispatch(nativeHandle);
CheckResultCode(code);
while (status != QueueState.Stopped)
Thread.Sleep(10);
}
public void timedDispatch(long timeout)
{
status = QueueState.Running;
EnsurePeerCreated();
int code = NativeMethods.mamaQueue_timedDispatch(nativeHandle, (ulong) timeout);
CheckResultCode(code);
status = QueueState.Stopped;
}
#region Implementation details
#region EnqueueEventForwarder Class
private class EnqueueEventForwarder
{
#region Private Member Variables
private MamaQueueEventCallback mCallback;
private object mClosure;
private long mIndex;
private MamaQueue mSender;
#endregion
#region Private Static Member Variables
private static Dictionary<long, EnqueueEventForwarder> mEventList;
private static Mutex mEventMutex;
private static EnqueueCallback mNativeCallback;
private static long mNextKey;
#endregion
#region Public Delegates
public delegate void EnqueueCallback(IntPtr queue, IntPtr closure);
#endregion
#region Construction and Finalization
static EnqueueEventForwarder()
{
// Create the mutex
mEventMutex = new Mutex(false);
// Create the dictionary
mEventList = new Dictionary<long, EnqueueEventForwarder>();
// Create the native callback delegate
mNativeCallback = new EnqueueCallback(EnqueueEventForwarder.OnEvent);
// Reset the key value
mNextKey = 0;
}
public EnqueueEventForwarder(MamaQueueEventCallback callback, object closure, MamaQueue sender)
{
// Save arguments in member variables.
mCallback = callback;
mClosure = closure;
mSender = sender;
// Save a reference to this object in the static list
mIndex = AddForwarder(this);
}
#endregion
#region Event Handlers
internal static void OnEvent(IntPtr queue, IntPtr closure)
{
// Get the forward object from the event list
EnqueueEventForwarder forwarder = GetForwarder(closure.ToInt32());
if(forwarder != null)
{
// Only invoke the callback object if it is valid.
if(forwarder.mCallback != null)
{
/* Invoke the user supplied callback passing in the original closure, the one
* sent up from the C layer is ignored as it will not be a valid .Net object.
*/
forwarder.mCallback.onEvent(forwarder.mSender, forwarder.mClosure);
}
// Remove the reference from the event list
RemoveForwarder(forwarder.mIndex);
}
}
#endregion
#region Private Static Operations
private static long AddForwarder(EnqueueEventForwarder forwarder)
{
// Returns
long ret = mNextKey;
// Acquire the mutex
mEventMutex.WaitOne();
// Add the forwarder to the list
mEventList.Add(mNextKey, forwarder);
// Increment the next key index
mNextKey ++;
// Release the mutex
mEventMutex.ReleaseMutex();
return ret;
}
private static EnqueueEventForwarder GetForwarder(long key)
{
// Returns
EnqueueEventForwarder ret = null;
// Acquire the mutex
mEventMutex.WaitOne();
// Obtain the forwarder from the dictionary
ret = (EnqueueEventForwarder)mEventList[key];
// Release the mutex
mEventMutex.ReleaseMutex();
return ret;
}
private static void RemoveForwarder(long key)
{
// Acquire the mutex
mEventMutex.WaitOne();
// Remove the forwarder from the array list
mEventList.Remove(key);
// Release the mutex
mEventMutex.ReleaseMutex();
}
#endregion
#region Public Operations
public EnqueueCallback NativeCallback
{
get
{
return mNativeCallback;
}
}
#endregion
#region Public Static Operations
public static int ForwardEvent(MamaQueueEventCallback callback, object closure, IntPtr nativeHandle, MamaQueue sender)
{
// Create a new forwarder object to manage the .Net callback objects
EnqueueEventForwarder forwarder = new EnqueueEventForwarder(callback, closure, sender);
/* Invoke the native method, the index into the array will be passed as the clousure.
*/
return NativeMethods.mamaQueue_enqueueEvent(nativeHandle, forwarder.NativeCallback, new IntPtr(forwarder.mIndex));
}
#endregion
}
#endregion
private class EnqueueCallbackForwarder
{
public delegate void EnqueueCallback(IntPtr queue, IntPtr closure);
public EnqueueCallbackForwarder(MamaQueue sender, MamaQueueEnqueueCallback callback)
{
mSender = sender;
mCallback = callback;
}
internal void OnEnqueue(IntPtr queue, IntPtr closure)
{
if (mCallback != null)
{
mCallback.onEnqueue(mSender);
}
}
private MamaQueueEnqueueCallback mCallback;
private MamaQueue mSender;
}
private class WatermarkCallbackForwarder
{
public delegate void HighWatermarkExceededCallback(IntPtr queue, int size, IntPtr closure);
public delegate void LowWatermarkCallback(IntPtr queue, int size, IntPtr closure);
public WatermarkCallbackForwarder(MamaQueue sender, MamaQueueMonitorCallback callback, object closure)
{
mSender = sender;
mCallback = callback;
mClosure = closure;
}
internal void HighWatermarkExceeded(IntPtr queue, int size, IntPtr closure)
{
if (mCallback != null)
{
mCallback.HighWatermarkExceeded(mSender, size, mClosure);
}
}
internal void LowWatermark(IntPtr queue, int size, IntPtr closure)
{
if (mCallback != null)
{
mCallback.LowWatermark(mSender, size, mClosure);
}
}
private MamaQueueMonitorCallback mCallback;
private MamaQueue mSender;
private object mClosure;
}
private struct NativeMethods
{
// export definitions
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_create(ref IntPtr result,
IntPtr bridgeImpl);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_canDestroy(IntPtr nativeHandle);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_destroy(IntPtr nativeHandle);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_destroyWait(IntPtr nativeHandle);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_destroyTimedWait(IntPtr nativeHandle, long timeout);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_setHighWatermark (IntPtr nativeHandle,
int highWatermark);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_getHighWatermark (IntPtr nativeHandle,
ref int highWatermark);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_setLowWatermark (IntPtr nativeHandle,
int lowWatermark);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_setQueueName(IntPtr nativeHandle,
string name);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_getLowWatermark (IntPtr nativeHandle,
ref int lowWatermark);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_getEventCount (IntPtr nativeHandle, ref int eventCount);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_getQueueBridgeName (IntPtr nativeHandle, ref IntPtr name);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_setQueueMonitorCallbacks (IntPtr nativeHandle,
ref WatermarkCallbacks queueMonitorCallbacks,
IntPtr closure);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_dispatch (IntPtr nativeHandle);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_dispatchEvent (IntPtr nativeHandle);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_stopDispatch (IntPtr nativeHandle);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_timedDispatch (IntPtr nativeHandle,
ulong timeout);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_setEnqueueCallback (IntPtr nativeHandle,
EnqueueCallbackForwarder.EnqueueCallback callback,
IntPtr closure);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaQueue_enqueueEvent(
IntPtr nativeHandle,
EnqueueEventForwarder.EnqueueCallback callback,
IntPtr closure);
}
// state
private EnqueueCallbackForwarder mEnqueueCallbackForwarder;
private EnqueueCallbackForwarder.EnqueueCallback mEnqueueShimCallback;
[StructLayout(LayoutKind.Sequential)]
private struct WatermarkCallbacks
{
public WatermarkCallbackForwarder.HighWatermarkExceededCallback mHighWatermarkExceededShimCallback;
public WatermarkCallbackForwarder.LowWatermarkCallback mLowWatermarkShimCallback;
}
private WatermarkCallbackForwarder mWatermarkCallbackForwarder;
private WatermarkCallbacks mWatermarkShimCallbacks;
#endregion // Implementation details
}
}
Updated on 2023-03-31 at 15:29:34 +0100