Name |
---|
Wombat |
Name | |
---|---|
class | Wombat::MamaTransport The underpinning object in any MAMA application |
/* $Id$
*
* OpenMAMA: The open middleware agnostic messaging API
* Copyright (C) 2011 NYSE Technologies, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/
using System;
using System.Runtime.InteropServices;
namespace Wombat
{
public class MamaTransport : MamaWrapper
{
public enum MamaThrottleInstance
{
MAMA_THROTTLE_DEFAULT = 0,
MAMA_THROTTLE_INITIAL = 1,
MAMA_THROTTLE_RECAP = 2
}
public enum MamaQuality : int
{
MAMA_QUALITY_OK = 0,
MAMA_QUALITY_MAYBE_STALE = 1,
MAMA_QUALITY_STALE = 2,
MAMA_QUALITY_PARTIAL_STALE = 3,
MAMA_QUALITY_FORCED_STALE = 4,
MAMA_QUALITY_DUPLICATE = 5,
MAMA_QUALITY_UNKNOWN = 99
}
public MamaTransport() : base()
{
int code = NativeMethods.mamaTransport_allocate(ref nativeHandle);
CheckResultCode(code);
}
internal MamaTransport(IntPtr transport) : base(transport)
{
}
public IntPtr getNativeTransport ()
{
IntPtr val = IntPtr.Zero;
int code = NativeMethods.mamaTransport_getNativeTransport (nativeHandle, 0, ref val);
CheckResultCode(code);
return val;
}
public void create(string name, MamaBridge bridgeImpl)
{
#if MAMA_WRAPPERS_CHECK_ARGUMENTS
if (name == null)
{
throw new ArgumentNullException("name");
}
#endif // MAMA_WRAPPERS_CHECK_ARGUMENTS
EnsurePeerCreated();
int code = NativeMethods.mamaTransport_create(nativeHandle, name, bridgeImpl.NativeHandle);
CheckResultCode(code);
}
public double getOutboundThrottle(MamaThrottleInstance instance)
{
double ret = 0;
int code = NativeMethods.mamaTransport_getOutboundThrottle(nativeHandle, instance, ref ret);
CheckResultCode(code);
return ret;
}
public void setOutboundThrottle(MamaThrottleInstance instance, double outboundThrottle)
{
int code = NativeMethods.mamaTransport_setOutboundThrottle(nativeHandle, instance, outboundThrottle);
CheckResultCode(code);
}
public void setSymbolMapFunc(MamaSymbolMapCallback callback, object closure)
{
EnsurePeerCreated();
mCallbackForwarderSymbolMap = new CallbackForwarderSymbolMap(this, callback, closure);
mShimCallbackSymbolMap = new CallbackForwarderSymbolMap.SymbolMapFuncCallback(mCallbackForwarderSymbolMap.OnSymbolMapFunc);
NativeMethods.mamaTransport_setSymbolMapFunc(nativeHandle, mShimCallbackSymbolMap, IntPtr.Zero);
}
public MamaSymbolMapCallback getSymbolMapFunc()
{
return mCallbackForwarderSymbolMap.GetCallback();
}
public object getSymbolMapFuncClosure()
{
return mCallbackForwarderSymbolMap.GetClosure();
}
public MamaQuality Quality
{
get
{
MamaQuality qual = MamaQuality.MAMA_QUALITY_UNKNOWN;
int code = MamaTransport.NativeMethods.mamaTransport_getQuality(nativeHandle,
ref qual);
CheckResultCode(code);
return qual;
}
}
public void requestConflation()
{
int code = NativeMethods.mamaTransport_requestConflation(nativeHandle);
CheckResultCode(code);
}
public void requestEndConflation()
{
int code = NativeMethods.mamaTransport_requestEndConflation(nativeHandle);
CheckResultCode(code);
}
protected override void OnDispose()
{
if (nativeHandle != IntPtr.Zero && mCallback != null)
{
setTransportCallback(null);
}
base.OnDispose();
}
public void destroy()
{
Dispose();
}
protected override MamaStatus.mamaStatus DestroyNativePeer()
{
return (MamaStatus.mamaStatus)NativeMethods.mamaTransport_destroy(nativeHandle);
}
public void setTransportCallback(MamaTransportCallback callback)
{
EnsurePeerCreated();
if (callback == null)
{
mCallbackForwarder = null;
mCallback = null;
}
else
{
mCallbackForwarder = new CallbackForwarder(this, callback);
mCallback = new CallbackForwarder.TransportCallbackDelegate(mCallbackForwarder.OnTransportCallback);
}
int code = NativeMethods.mamaTransport_setTransportCallback(nativeHandle, mCallback, nativeHandle);
CheckResultCode(code);
GC.KeepAlive(callback);
}
public void setTransportTopicCallback(MamaTransportTopicCallback callback)
{
EnsurePeerCreated();
if (callback == null)
{
mTopicCallbackForwarder = null;
mTopicCallback = null;
}
else
{
mTopicCallbackForwarder = new TopicCallbackForwarder(this, callback);
mTopicCallback = new TopicCallbackForwarder.TransportTopicCallbackDelegate(mTopicCallbackForwarder.OnTransportTopicCallback);
}
int code = NativeMethods.mamaTransport_setTransportTopicCallback(nativeHandle, mTopicCallback, nativeHandle);
CheckResultCode(code);
GC.KeepAlive(callback);
}
public string getName()
{
// Get the symbol from the native layer
IntPtr ret = IntPtr.Zero;
CheckResultCode(NativeMethods.mamaTransport_getName(nativeHandle, ref ret));
// Convert to an ANSI string
return Marshal.PtrToStringAnsi(ret);
}
public void setTransportCallbackQueue(MamaQueue queue)
{
int code = NativeMethods.mamaTransport_setTransportCallbackQueue(nativeHandle, queue.NativeHandle);
CheckResultCode(code);
GC.KeepAlive(queue);
}
public MamaQueue getTransportCallbackQueue()
{
IntPtr queuePtr = IntPtr.Zero;
int code = NativeMethods.mamaTransport_getTransportCallbackQueue(nativeHandle, ref queuePtr);
CheckResultCode(code);
return new MamaQueue(queuePtr);
}
#region Implementation details
// Interop bridge
private sealed class CallbackForwarder
{
private enum mamaTransportEvent : int
{
// enums need to be ordered as in transport.c
MAMA_TRANSPORT_CONNECT = 0,
MAMA_TRANSPORT_CONNECT_FAILED = 1,
MAMA_TRANSPORT_RECONNECT = 2,
MAMA_TRANSPORT_DISCONNECT = 3,
MAMA_TRANSPORT_ACCEPT = 4,
MAMA_TRANSPORT_ACCEPT_RECONNECT = 5,
MAMA_TRANSPORT_PUBLISHER_DISCONNECT = 6,
MAMA_TRANSPORT_QUALITY = 7,
MAMA_TRANSPORT_NAMING_SERVICE_CONNECT = 8,
MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT = 9
}
private MamaTransportCallback mCallback;
private MamaTransport mTarget;
public delegate void TransportCallbackDelegate(
IntPtr transport,
int transportEvent,
short cause,
IntPtr opaque,
IntPtr closure);
public void OnTransportCallback(
IntPtr transport,
int transportEvent,
short cause,
IntPtr opaque,
IntPtr closure)
{
if (mCallback != null)
{
switch ((mamaTransportEvent)transportEvent)
{
case mamaTransportEvent.MAMA_TRANSPORT_RECONNECT :
mCallback.onReconnect(mTarget);
break;
case mamaTransportEvent.MAMA_TRANSPORT_DISCONNECT :
mCallback.onDisconnect(mTarget);
break;
case mamaTransportEvent.MAMA_TRANSPORT_QUALITY :
mCallback.onQuality(mTarget);
break;
case mamaTransportEvent.MAMA_TRANSPORT_CONNECT :
mCallback.onConnect(mTarget);
break;
case mamaTransportEvent.MAMA_TRANSPORT_ACCEPT :
mCallback.onAcceptConnect(mTarget);
break;
case mamaTransportEvent.MAMA_TRANSPORT_ACCEPT_RECONNECT :
mCallback.onAcceptReconnect(mTarget);
break;
case mamaTransportEvent.MAMA_TRANSPORT_PUBLISHER_DISCONNECT :
mCallback.onPublisherDisconnect(mTarget);
break;
case mamaTransportEvent.MAMA_TRANSPORT_NAMING_SERVICE_CONNECT :
mCallback.onNamingServiceConnect(mTarget);
break;
case mamaTransportEvent.MAMA_TRANSPORT_NAMING_SERVICE_DISCONNECT :
mCallback.onNamingServiceDisconnect(mTarget);
break;
default: break;
}
}
}
public CallbackForwarder(MamaTransport target, MamaTransportCallback callback)
{
this.mTarget = target;
this.mCallback = callback;
}
}
// Interop bridge
private sealed class TopicCallbackForwarder
{
private enum mamaTransportTopicEvent : int
{
// enums need to be ordered as in transport.c
MAMA_TRANSPORT_TOPIC_SUBSCRIBED = 0,
MAMA_TRANSPORT_TOPIC_UNSUBSCRIBED = 1,
MAMA_TRANSPORT_TOPIC_PUBLISH_ERROR = 2,
MAMA_TRANSPORT_TOPIC_PUBLISH_ERROR_NOT_ENTITLED = 3,
MAMA_TRANSPORT_TOPIC_PUBLISH_ERROR_BAD_SYMBOL = 4
}
private MamaTransportTopicCallback mCallback;
private MamaTransport mTarget;
public delegate void TransportTopicCallbackDelegate(
IntPtr transport,
int transportEvent,
string topic,
IntPtr opaque,
IntPtr closure);
public void OnTransportTopicCallback(
IntPtr transport,
int transportEvent,
string topic,
IntPtr opaque,
IntPtr closure)
{
if (mCallback != null)
{
switch ((mamaTransportTopicEvent) transportEvent)
{
case mamaTransportTopicEvent.MAMA_TRANSPORT_TOPIC_SUBSCRIBED:
mCallback.onTopicSubscribe(mTarget, topic, opaque);
break;
case mamaTransportTopicEvent.MAMA_TRANSPORT_TOPIC_UNSUBSCRIBED:
mCallback.onTopicUnsubscribe(mTarget, topic, opaque);
break;
case mamaTransportTopicEvent.MAMA_TRANSPORT_TOPIC_PUBLISH_ERROR:
mCallback.onTopicPublishError(mTarget, topic, opaque);
break;
case mamaTransportTopicEvent.MAMA_TRANSPORT_TOPIC_PUBLISH_ERROR_NOT_ENTITLED:
mCallback.onTopicPublishErrorNotEntitled(mTarget, topic, opaque);
break;
case mamaTransportTopicEvent.MAMA_TRANSPORT_TOPIC_PUBLISH_ERROR_BAD_SYMBOL:
mCallback.onTopicPublishErrorBadSymbol(mTarget, topic, opaque);
break;
default: break;
}
}
}
public TopicCallbackForwarder(MamaTransport target, MamaTransportTopicCallback callback)
{
this.mTarget = target;
this.mCallback = callback;
}
}
// Interop bridge for symbol map
private sealed class CallbackForwarderSymbolMap
{
public delegate int SymbolMapFuncCallback(IntPtr closure, IntPtr result, string symbol, int maxLen);
public CallbackForwarderSymbolMap(MamaTransport sender, MamaSymbolMapCallback callback, object closure)
{
mCallback = callback;
mClosure = closure;
}
public MamaSymbolMapCallback GetCallback()
{
return mCallback;
}
public object GetClosure()
{
return mClosure;
}
internal int OnSymbolMapFunc(
IntPtr closure,
IntPtr result,
string symbol,
int maxLen)
{
if (mCallback != null)
{
string resultString = null;
int code = mCallback.SymbolMapFunc(mClosure, ref resultString, symbol, maxLen);
// Copy resultString to result, adding null character at end.
Char[] chars = resultString.ToCharArray();
Byte[] bytes = System.Text.Encoding.ASCII.GetBytes (chars);
int numBytes = resultString.Length < maxLen ? resultString.Length + 1 : maxLen;
Byte[] resultBytes = new Byte[numBytes];
for (int i = 0; i < numBytes-1; i++)
{
resultBytes[i] = bytes[i];
}
resultBytes[numBytes-1] = (byte)0;
Marshal.Copy(resultBytes, 0, result, numBytes);
return code;
}
return 0;
}
private MamaSymbolMapCallback mCallback;
private object mClosure;
}
private struct NativeMethods
{
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_allocate(ref IntPtr result);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_create(
IntPtr transport,
[MarshalAs(UnmanagedType.LPStr)]
string name,
IntPtr bridgeImpl);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_getOutboundThrottle(IntPtr transport,
MamaThrottleInstance instance,
ref double result);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_setOutboundThrottle(IntPtr transport,
MamaThrottleInstance instance,
double outboundThrottle);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_destroy(IntPtr transport);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_setTransportCallback (IntPtr transport,
CallbackForwarder.TransportCallbackDelegate callback,
IntPtr closure);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_setTransportTopicCallback(IntPtr transport,
TopicCallbackForwarder.TransportTopicCallbackDelegate callback,
IntPtr closure);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern void mamaTransport_setSymbolMapFunc(IntPtr transport,
CallbackForwarderSymbolMap.SymbolMapFuncCallback callback,
IntPtr closure);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_requestConflation(IntPtr transport);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_requestEndConflation(IntPtr transport);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_getNativeTransport (IntPtr nativeHandle, int index, ref IntPtr val);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_getQuality(IntPtr transport, ref MamaQuality qual);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_getName(IntPtr transport, ref IntPtr ret);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_getTransportCallbackQueue(IntPtr nativeHandle, ref IntPtr val);
[DllImport(Mama.DllName, CallingConvention = CallingConvention.Cdecl)]
public static extern int mamaTransport_setTransportCallbackQueue(IntPtr nativeHandle, IntPtr val);
}
// state
private CallbackForwarder mCallbackForwarder;
private CallbackForwarder.TransportCallbackDelegate mCallback;
private TopicCallbackForwarder mTopicCallbackForwarder;
private TopicCallbackForwarder.TransportTopicCallbackDelegate mTopicCallback;
private CallbackForwarderSymbolMap mCallbackForwarderSymbolMap;
private CallbackForwarderSymbolMap.SymbolMapFuncCallback mShimCallbackSymbolMap;
#endregion // Implementation details
}
}
Updated on 2023-03-31 at 15:29:34 +0100