Classes Files

examples/MamaListenCached.java

Namespaces

Name
com::wombat::mama::examples

Classes

  Name
class com::wombat::mama::examples::MamaListenCached

Source code

package com.wombat.mama.examples;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.logging.Logger;
import java.util.logging.Level;
import com.wombat.mama.MamaTimer;
import java.util.Map;
import java.util.Collections;
import java.util.HashMap;
import java.io.*;
import java.util.Random;

import com.wombat.mama.*;

public class MamaListenCached
{
    private static MamaTransport    transport           = null;
    private static String           myMiddleware        = "wmw";
    private static MamaQueue        myDefaultQueue      = null;
    private static MamaBridge       myBridge            = null;
    private static double           throttle            = -1;
    private static MamaDictionary   dictionary          = null;
    private static String           dictSource          = "WOMBAT";
    private static boolean          dumpDataDict        = false;
    private static boolean          buildDataDict       = true;    
    private static boolean          dictionaryComplete  = false;
    private static String           transportName       = "internal";
    private static boolean          requireInitial      = true;
    private static boolean          snapshot            = false;
    private static double           timeout             = 10.0;
    private static int              quietness           = 0;
    private static boolean          printUsingName      = false;
    private static boolean          printUsingFid       = false;
    private static int              numThreads          = 0;
    private static MamaQueueGroup   queueGroup          = null;
    private static boolean          qualityForAll       = true;
    private static final ArrayList  subscriptions       = new ArrayList();
    private static final ArrayList  subjectList         = new ArrayList();
    private static String           filename            = null;
    private static String           mySymbolNamespace   = null;
    private static final Logger     logger              = Logger.getLogger( "com.wombat.mama" );
    private static Level            myLogLevel;
    private static boolean          myGroupSubscription = false;
    private static MamaSource       mySource;   
    private static MamaSource       myDictSource;
    private static boolean          verifyAgainstFeed   = false;
    private static MamaTimer        myVerifyTimer       = null;
    private static double           myVerifyInterval    = 1.0;
    private static String           failedField         = null;
    private static boolean          display             = true;    
    
    public static void main (final String[] args)
    {
        parseCommandLine (args);

        try
        {
            if (subjectList.isEmpty())
            {
                readSubjectsFromFile();
            }

            initializeMama ();            
            buildDataDictionary ();
            dumpDataDictionary ();            
            
            subscribeToSubjects ();
            
            System.out.println( "Type CTRL-C to exit." );
            /*This will only block on JNI*/
            Mama.start (myBridge);
            /*Keep the main thread running if using pure Java MAMA*/
            Thread.currentThread().join();
        }
        catch (Exception e)
        {
            if (e.getCause() != null)
            {
                e.getCause ().printStackTrace ();
            }

            e.printStackTrace ();
            System.exit (1);
        }
        finally
        {
            shutdown ();
        }
    }

    private static void subscribeToSubjects()
    {
        int howMany = 0;

        queueGroup = new MamaQueueGroup (numThreads, myBridge);

        /*Subscribe to all symbols specified on the commanty line or from the
         * symbol file*/
        for (Iterator iterator = subjectList.iterator(); iterator.hasNext();)
        {
            MamaQueue queue;
            final String symbol = (String) iterator.next();
            
            MamaSubscription subscription = new MamaSubscription ();
            queue= queueGroup.getNextQueue ();
            /*Properties common to all subscription types*/
            subscription.setTimeout (timeout);
            subscription.setRetries (1);
            subscription.setRequiresInitial (requireInitial);
            SubscriptionCallback callback = new SubscriptionCallback(queue, symbol);    
            subscription.createSubscription (callback,
                                             queue,
                                             mySource,
                                             symbol,
                                             null);
           

            logger.fine ("Subscribed to: " + symbol);

            if (++howMany % 1000 == 0)
            {
                System.out.println ("Subscribed to " + howMany + " symbols.");
            }
        }
    }

    private static void dumpDataDictionary()
    {
        if (dumpDataDict)
        {
            for (int i = 0; i < dictionary.getSize(); i++)
            {
                MamaFieldDescriptor fd = dictionary.getFieldByIndex (i);
                print (""+fd.getFid(),5);
                System.out.print (": ");
                print (""+fd.getType(),3);
                System.out.println (": "+fd.getName());
            }
        }
        System.out.flush();
    }

    private static void buildDataDictionary()
        throws InterruptedException
    {
        MamaDictionaryCallback dictionaryCallback = createDictionaryCallback();
        
        MamaSubscription subscription = new MamaSubscription ();
        
        if (buildDataDict)
        {
            synchronized (dictionaryCallback)
            {
                /*The dictionary is obtained through a specialized form of
                * subscription.*/            

                dictionary = subscription.createDictionarySubscription (
                        dictionaryCallback,
                        myDefaultQueue,
                        myDictSource,
                        10.0,
                        2);
                    
                /*Mama.start() will only block for JNI*/
                Mama.start (myBridge);
                if (!dictionaryComplete) dictionaryCallback.wait( 30000 );
                if (!dictionaryComplete)
                {
                    System.err.println( "Timed out waiting for dictionary." );
                    System.exit( 0 );
                }
            }             
        }
    }

    private static MamaDictionaryCallback createDictionaryCallback()
    {
        return new MamaDictionaryCallback()
        {
            public void onTimeout()
            {
                System.err.println( "Timed out waiting for dictionary" );
                System.exit(1);
            }

            public void onError (final String s)
            {
                System.err.println ("Error getting dictionary: " + s);
                System.exit (1);
            }

            public synchronized void onComplete()
            {
                dictionaryComplete = true;
                Mama.stop(myBridge);
                notifyAll();
            }
        };
    }

    private static void shutdown()
    {
        for (Iterator iterator = subscriptions.iterator(); iterator.hasNext();)
        {
            final MamaSubscription subscription = (MamaSubscription) iterator.next();
            subscription.destroy();
        }
        if (transport != null)
        {
            transport.destroy();
        }
        Mama.close();
    }

    private static void initializeMama()
    {
        try
        {
            myBridge = Mama.loadBridge (myMiddleware);
            /*Always the first API method called. Initialized internal API
             * state*/
            Mama.open ();
            myDefaultQueue = Mama.getDefaultQueue (myBridge);
        }
        catch (Exception e)
        {
            e.printStackTrace ();
            System.out.println ("Failed to initialize MAMA");
            System.exit (1);
        }
        
        transport = new MamaTransport ();
        /*The name specified here is the name identifying properties in the
         * mama.properties file*/
        transport.create (transportName, myBridge);
       
        /*Receive notification of transport level events*/
        transport.addTransportListener( new MamaTransportListener()
        {
            public void onConnect(short cause, final Object platformInfo)
            {
                System.out.println ("TRANSPORT CONNECTED!");
            }
            public void onDestroy(short cause, final Object platformInfo)
            {
                System.out.println ("TRANSPORT DESTROYED!");
            }
            public void onAccept(short cause, final Object platformInfo)
            {
                System.out.println ("TRANSPORT ACCEPTED!");
            }
            public void onAcceptReconnect(short cause, final Object platformInfo)
            {
                System.out.println ("ACCEPTED RECONNECT!");
            }

            public void onPublisherDisconnect(short cause, final Object platformInfo)
            {
                System.out.println ("PUBLISHER DISCONNECTED!");
            }

            public void onNamingServiceConnect(short cause, final Object platformInfo)
            {
                System.out.println ("NAMING SERVICE CONNECTED!");
            }

            public void onNamingServiceDisconnect(short cause, final Object platformInfo)
            {
                System.out.println ("NAMING SERVICE DISCONNECTED!");
            }

            public void onDisconnect(short cause, final Object platformInfo)
            {
                System.out.println ("TRANSPORT DISCONNECTED!");
            }

            public void onReconnect(short cause, final Object platformInfo)
            {
                System.out.println ("TRANSPORT RECONNECTED!");
            }

            public void onQuality(short cause, final Object platformInfo)
            {
                System.out.println ("TRANSPORT QUALITY!");
                short quality = transport.getQuality();
                System.out.println ("Transport quality is now " +
                                    MamaQuality.toString(quality) +
                                    ", cause " + MamaDQCause.toString (cause) +
                                    ", platformInfo: " + platformInfo);
            }
        } );

        if (!qualityForAll)
        {
            transport.setInvokeQualityForAllSubscs (false);
        }

        /*MamaSource for all subscriptions*/
        mySource     = new MamaSource ();
        mySource.setTransport (transport);
        mySource.setSymbolNamespace (mySymbolNamespace);

        /*MamaSource for dictionary subscription*/
        myDictSource = new MamaSource ();
        myDictSource.setTransport (transport);
        myDictSource.setSymbolNamespace (dictSource);

        if (throttle != -1)
        {
            transport.setOutboundThrottle
                (MamaThrottleInstance.DEFAULT_THROTTLE,throttle);
        }
    }

    private static void readSubjectsFromFile() throws IOException
    {
        InputStream input;
        if (filename != null)
        {
            input = new FileInputStream (filename);
        }
        else
        {
            input = System.in;
            System.out.println ("Enter one symbol per line and terminate with a .");
            System.out.print ("SUBJECT>");
        }

        final BufferedReader reader =
            new BufferedReader (new InputStreamReader (input));

        String symbol;
        while (null != (symbol = reader.readLine()))
        {
            if (!symbol.equals(""))
            {
                if (symbol.equals( "." ))
                {
                    break;
                }
                subjectList.add (symbol);
            }

            if (input == System.in)
            {
                System.out.print ("SUBJECT>");
            }
        }

        if (subjectList.isEmpty())
        {
            System.err.println ("No subjects specified");
            System.exit (1);
        }
        System.out.flush();
    }

    private static void print (final String what, final int width)
    {
        if(quietness < 1)
        {
            int whatLength = 0;
            if (what!=null)
                whatLength = what.length();

            final int spaces = width - whatLength;
            System.out.print (what);

            for (int i = 0; i < spaces; i++)
                System.out.print(" ");
        }
        System.out.flush();
    }

    private static void parseCommandLine (final String[] args)
    {
        for(int i = 0; i < args.length;)
        {
            if (args[i].equals ("-source") || args[i].equals("-S"))
            {
                mySymbolNamespace = args[i +1];
                i += 2;
            }
            else if (args[i].equals ("-d"))
            {
                dictSource = args[i + 1];
                i += 2;
            }
            else if (args[i].equals ("-D"))
            {
                dumpDataDict = true;
                i++;
            }
            else if (args[i].equals ("-B"))
            {
                buildDataDict = false;
                i++;
            }
            else if (args[i].equals ("-I"))
            {
                requireInitial = false;
                i++;
            }
            else if (args[i].equals ("-s"))
            {
                subjectList.add (args[i + 1]);
                i += 2;
            }
            else if (args[i].equals ("-f"))
            {
                filename = args[i + 1];
                i += 2;
            }
            else if (args[i].equals ("-1"))
            {
                snapshot = true;
                i++;
            }
            else if (args[i].equals ("-r"))
            {
                throttle = Double.parseDouble (args[i + 1]);
                i += 2;
            }
            else if (args[i].equals ("-t"))
            {
                timeout = Double.parseDouble (args[i + 1]);
                i += 2;
            }
            else if (args[i].equals ("-tport"))
            {
                transportName = args[i + 1];
                i += 2;
            }
            else if (args[i].equals ("-threads"))
            {
                numThreads = Integer.parseInt (args[i+1]);
                i += 2;
            }
            else if (args[i].equals ("-A"))
            {
                qualityForAll = false;
                i++;
            }
            else if (args[i].equals ("-printUsingName"))
            {
                printUsingName = true;
                i++;
            }
            else if (args[i].equals ("-printUsingFid"))
            {
                printUsingFid = true;
                i++;
            }
            else if (args[i].equals ("-q"))
            {
                quietness++;
                i++;
            }
            else if (args[i].equals ("-g"))
            {
                myGroupSubscription =  true;
                i++;
            }
            else if (args[i].equals ("-m"))
            {
                myMiddleware = args[i + 1];
                i += 2;
            }
            else if (args[i].equals ("-v"))
            {
                myLogLevel = myLogLevel == null
                            ? Level.FINE    :
                            myLogLevel == Level.FINE    
                            ? Level.FINER   : 
                            Level.FINEST;

                Mama.enableLogging (myLogLevel);
                i++;
            }
            else if (args[i].equals ("-verify"))
            {
                verifyAgainstFeed = true;                
                i++;
            }
            else if (args[i].equals ("-verifyInterval"))
            {
                myVerifyInterval = Double.parseDouble (args[i + 1]);
                i += 2;
            }           
        }
    }

    /*Class for processing all event callbacks for all subscriptions*/
    private static class SubscriptionCallback implements MamaSubscriptionCallback
    {
        MamaFieldCache  myFieldCache = new MamaFieldCache();
        MamaTimer       myVerifyTimer;  
        int             myRandomNo;             
        MamaQueue       myQueue;      

        public SubscriptionCallback (MamaQueue queue, String symbol)
        {
            Random generator = new Random();
            myRandomNo = generator.nextInt (10);
            myQueue = queue;
            if (verifyAgainstFeed)
            {                
                if (myVerifyInterval > 0)
                {
                    try
                    {                
                        myVerifyTimer = new MamaTimer();                    
                        myVerifyTimer.create (myQueue, new VerifyCallback(this,symbol), 
                        myVerifyInterval+1);
                    }                 
                    catch (Exception e)
                    {
                        e.printStackTrace( );
                        System.err.println( "Error creating timer: " + e );
                        System.exit(1);  
                    }
                }                
            }
        }
        
        public void onMsg (final MamaSubscription subscription, final MamaMsg msg)
        {            
            try
            {
                switch (MamaMsgType.typeForMsg (msg))
                {
                    case MamaMsgType.TYPE_DELETE:
                    case MamaMsgType.TYPE_EXPIRE:
                        subscription.destroy ();
                        subscriptions.remove (subscription);
                        return;
                    case MamaMsgType.TYPE_SNAPSHOT:
                        return;
                }

                switch (MamaMsgStatus.statusForMsg (msg))
                {
                    case MamaMsgStatus.STATUS_BAD_SYMBOL:
                    case MamaMsgStatus.STATUS_EXPIRED:
                    case MamaMsgStatus.STATUS_TIMEOUT:
                        subscription.destroy();
                        subscriptions.remove (subscription);
                        return;
                }
            }
            catch (Exception ex) 
            {
                ex.printStackTrace ();
                System.exit (0);
            }          
            
            //Apply the message to the FieldCache
            myFieldCache.apply(msg,dictionary,null);
            displayAllFields (subscription, msg);
        }

        /*Class for processing fields within a message - for the message
         * iterator*/
        private class FieldIterator implements MamaMsgFieldIterator
        {
            private boolean fieldHasName = true;
            public void onField (MamaMsg        msg,
                                 MamaMsgField   field,
                                 MamaDictionary dictionary,
                                 Object         closure)
            {    
                String fieldName = null;
                try
                {
                    if (fieldHasName)
                    {
                        fieldName = field.getName ();
                    }
                }
                catch (Exception e)
                {
                    fieldHasName = false;
                }
                    
                print (" \t ", 0);
                print (fieldName,20);
                print (" | ", 0);
                print ("" + field.getFid(),4);
                print (" | ", 0);
                print ("" + field.getTypeName(),10);
                print (" | ", 0);

                if (printUsingName == true)
                {
                    if (buildDataDict)
                    {
                        printUsingName (field);
                    }
                    else 
                    {
                        System.out.println ("CANNOT ACCESS FIELDS BY NAME");
                        System.exit(1);                            
                    }
                }
                else
                {                                               
                    printUsingFid (field);
                }
                print (" \n ", 0);
            }

            /*Access the data from the field objects*/
            private void printUsingFid (MamaMsgField field)
            {                
                short fieldType = field.getType ();
                switch (fieldType)
                {
                    case MamaFieldDescriptor.BOOL:
                        MamaFieldCacheBool mfcb = (MamaFieldCacheBool) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcb.get(), 20);
                        break;
                    case MamaFieldDescriptor.CHAR:
                        MamaFieldCacheChar mfcc = (MamaFieldCacheChar) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcc.get(), 20);
                        break;
                    case MamaFieldDescriptor.I8:
                        MamaFieldCacheInt8 mfci8 = (MamaFieldCacheInt8) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfci8.get(), 20);
                        break;
                    case MamaFieldDescriptor.U8:
                        MamaFieldCacheUint8 mfcui8 = (MamaFieldCacheUint8) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcui8.get(), 20);
                        break;
                    case MamaFieldDescriptor.I16:
                        MamaFieldCacheInt16 mfci16 = (MamaFieldCacheInt16) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfci16.get(), 20);
                        break;
                    case MamaFieldDescriptor.U16:
                        MamaFieldCacheUint16 mfcui16 = (MamaFieldCacheUint16) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcui16.get(), 20);
                        break;
                    case MamaFieldDescriptor.I32:
                        MamaFieldCacheInt32 mfci32 = (MamaFieldCacheInt32) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfci32.get(), 20);
                        break;
                    case MamaFieldDescriptor.U32:
                        MamaFieldCacheUint32 mfcui32 = (MamaFieldCacheUint32) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcui32.get(), 20);
                        break;
                    case MamaFieldDescriptor.I64:
                        MamaFieldCacheInt64 mfci64 = (MamaFieldCacheInt64) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfci64.get(), 20);
                        break;
                    case MamaFieldDescriptor.U64:
                        MamaFieldCacheUint64 mfcui64 = (MamaFieldCacheUint64) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcui64.get(), 20);
                        break;
                    case MamaFieldDescriptor.F32:
                        MamaFieldCacheFloat32 mfcf32 = (MamaFieldCacheFloat32) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcf32.get(), 20);
                        break;
                    case MamaFieldDescriptor.F64:
                        MamaFieldCacheFloat64 mfcf64 = (MamaFieldCacheFloat64) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcf64.get(), 20);
                        break;
                    case MamaFieldDescriptor.STRING:
                        MamaFieldCacheString mfcs = (MamaFieldCacheString) 
                                            myFieldCache.find (field.getFid());
                        print (mfcs.get(), 20);
                        break;
                    case MamaFieldDescriptor.TIME:
                        MamaFieldCacheDateTime mfcdt = (MamaFieldCacheDateTime) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcdt.get(), 20);
                        break;
                    case MamaFieldDescriptor.PRICE:
                        MamaFieldCachePrice mfcp = (MamaFieldCachePrice) 
                                            myFieldCache.find (field.getFid());
                        print ("" + mfcp.get(), 20);
                        break;
                    default:
                        print ("Unknown type: " + fieldType, 20);
                }
            }

            /*Access the data from the message object - random access*/
            private void printUsingName (MamaMsgField field)
            {                
                short fieldType = field.getType();
                switch (fieldType)
                {
                    case MamaFieldDescriptor.BOOL:
                        MamaFieldCacheBool mfcb = (MamaFieldCacheBool) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcb.get(), 20);
                        break;
                    case MamaFieldDescriptor.CHAR:
                        MamaFieldCacheChar mfcc = (MamaFieldCacheChar) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcc.get(), 20);
                        break;
                    case MamaFieldDescriptor.I8:
                        MamaFieldCacheInt8 mfci8 = (MamaFieldCacheInt8) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfci8.get(), 20);
                        break;
                    case MamaFieldDescriptor.U8:
                        MamaFieldCacheUint8 mfcui8 = (MamaFieldCacheUint8) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcui8.get(), 20);
                        break;
                    case MamaFieldDescriptor.I16:
                        MamaFieldCacheInt16 mfci16 = (MamaFieldCacheInt16) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfci16.get(), 20);
                        break;
                    case MamaFieldDescriptor.U16:
                        MamaFieldCacheUint16 mfcui16 = (MamaFieldCacheUint16) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcui16.get(), 20);
                        break;
                    case MamaFieldDescriptor.I32:
                        MamaFieldCacheInt32 mfci32 = (MamaFieldCacheInt32) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfci32.get(), 20);
                        break;
                    case MamaFieldDescriptor.U32:
                        MamaFieldCacheUint32 mfcui32 = (MamaFieldCacheUint32) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcui32.get(), 20);
                        break;
                    case MamaFieldDescriptor.I64:
                        MamaFieldCacheInt64 mfci64 = (MamaFieldCacheInt64) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfci64.get(), 20);
                        break;
                    case MamaFieldDescriptor.U64:
                        MamaFieldCacheUint64 mfcui64 = (MamaFieldCacheUint64) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcui64.get(), 20);
                        break;
                    case MamaFieldDescriptor.F32:
                        MamaFieldCacheFloat32 mfcf32 = (MamaFieldCacheFloat32) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcf32.get(), 20);
                        break;
                    case MamaFieldDescriptor.F64:
                        MamaFieldCacheFloat64 mfcf64 = (MamaFieldCacheFloat64) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcf64.get(), 20);
                        break;
                    case MamaFieldDescriptor.STRING:
                        MamaFieldCacheString mfcs = (MamaFieldCacheString) 
                                            myFieldCache.find (field.getName());
                        print (mfcs.get(), 20);
                        break;
                    case MamaFieldDescriptor.TIME:
                        MamaFieldCacheDateTime mfcdt = (MamaFieldCacheDateTime) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcdt.get(), 20);
                        break;
                    case MamaFieldDescriptor.PRICE:
                        MamaFieldCachePrice mfcp = (MamaFieldCachePrice) 
                                            myFieldCache.find (field.getName());
                        print ("" + mfcp.get(), 20);
                        break;
                    default:
                        print ("Unknown type: " + fieldType, 20);
                }              
            }
        }

        /* Class for processing fields and asserting the value of the cache 
         * against the feed*/
        private class FieldAsserter implements MamaMsgFieldIterator
        {
            public void onField (MamaMsg        msg,
                                 MamaMsgField   field,
                                 MamaDictionary dictionary,
                                 Object         closure)
            { 
                try
                {
                    assertField (field);
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }                
            }
            
            /* Assert the data from the snapshot message against the cache*/
            private void assertField (MamaMsgField field)
            {
                short fieldType = field.getType ();
                if (field.getFid () == MamaReservedFields.MsgType.getId() ||
                    field.getFid () == MamaReservedFields.AppMsgType.getId())
                {
                    return;
                }
                
                switch (fieldType)
                {
                    case MamaFieldDescriptor.BOOL:
                         MamaFieldCacheBool mfcb = (MamaFieldCacheBool) 
                                            myFieldCache.find (field.getFid());
                         if (mfcb.get() != field.getBoolean())   
                            failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.CHAR:
                         MamaFieldCacheChar mfcc = (MamaFieldCacheChar) 
                                            myFieldCache.find (field.getFid());
                         if (mfcc.get() != field.getChar())
                            failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.I8:
                         MamaFieldCacheInt8 mfci8 = (MamaFieldCacheInt8) 
                                            myFieldCache.find (field.getFid());
                         if (mfci8.get() != field.getI8())
                            failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.U8:
                         MamaFieldCacheUint8 mfcui8 = (MamaFieldCacheUint8) 
                                            myFieldCache.find (field.getFid());
                         if (mfcui8.get() != field.getU8())
                            failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.I16:
                         MamaFieldCacheInt16 mfci16 = (MamaFieldCacheInt16) 
                                            myFieldCache.find (field.getFid());
                         if (mfci16.get() != field.getI16())             
                            failedField = failedField + " " + field.getName();
                         break;
                    case MamaFieldDescriptor.U16:
                         MamaFieldCacheUint16 mfcui16 = (MamaFieldCacheUint16) 
                                            myFieldCache.find (field.getFid());
                         if (mfcui16.get() != field.getU16())
                           failedField = failedField + " " + field.getName();
                         break;
                    case MamaFieldDescriptor.I32:
                         MamaFieldCacheInt32 mfci32 = (MamaFieldCacheInt32) 
                                            myFieldCache.find (field.getFid());
                         if (mfci32.get() != field.getI32())
                            failedField = failedField + " " + field.getName();                         
                        break;
                    case MamaFieldDescriptor.U32:
                         MamaFieldCacheUint32 mfcui32 = (MamaFieldCacheUint32) 
                                            myFieldCache.find (field.getFid()); 
                 
                         if (mfcui32.get() != field.getU32())
                         failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.I64:
                         MamaFieldCacheInt64 mfci64 = (MamaFieldCacheInt64) 
                                            myFieldCache.find (field.getFid());
                         if (mfci64.get() != field.getI64())
                             failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.U64:
                         MamaFieldCacheUint64 mfcui64 = (MamaFieldCacheUint64) 
                                            myFieldCache.find (field.getFid());
                         if (mfcui64.get() != field.getU64())
                            failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.F32:
                         MamaFieldCacheFloat32 mfcf32 = (MamaFieldCacheFloat32) 
                                            myFieldCache.find (field.getFid());
                         if (mfcf32.get() != field.getF32())
                            failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.F64:
                         MamaFieldCacheFloat64 mfcf64 = (MamaFieldCacheFloat64) 
                                            myFieldCache.find (field.getFid());
                         if (mfcf64.get() != field.getF64())
                            failedField = failedField + " " + field.getName();                            
                         break;
                    case MamaFieldDescriptor.STRING:
                         MamaFieldCacheString mfcs = (MamaFieldCacheString) 
                                            myFieldCache.find (field.getFid());
                         if (!mfcs.get().equals (field.getString()))
                            failedField = failedField + " " + field.getName();
                         break;
                    case MamaFieldDescriptor.TIME:
                         MamaFieldCacheDateTime mfcdt = (MamaFieldCacheDateTime)
                                            myFieldCache.find (field.getFid());
                         if (!mfcdt.get().equals (field.getDateTime()))
                            failedField = failedField + " " + field.getName();                        
                         break;
                    case MamaFieldDescriptor.PRICE:
                         MamaFieldCachePrice mfcp = (MamaFieldCachePrice) 
                                            myFieldCache.find (field.getFid());
                         if (!mfcp.get().equals (field.getPrice()))
                            failedField = failedField + " " + field.getName();                            
                        break;
                    default:
                        print ("Unknown type: " + fieldType, 20);    
                }                        
            }
        }        
      
        private synchronized void displayAllFields(
                final MamaSubscription subscription,
                final MamaMsg          msg )
        {   
             if (quietness < 1)
             {
                  System.out.println (subscription.getSymbol () +
                         " Type: " + MamaMsgType.stringForType (msg) +
                         " Status: " + MamaMsgStatus.stringForStatus (msg));       
             }
             
             if (quietness < 2)
             {
                if (printUsingFid || printUsingName)
                {
                    msg.iterateFields (new FieldIterator(), dictionary, "Closure");    
                }
                
                Iterator fieldCacheIterator = myFieldCache.getIterator ();
                while (fieldCacheIterator.hasNext())
                {
                    MamaFieldCacheField myField = (MamaFieldCacheField)
                                                    fieldCacheIterator.next();
                    MamaFieldDescriptor myDesc  = myField.getDescriptor();
                    print (" \t ", 0);
                    print (myDesc.getName(),20);
                    print (" | ", 0);
                    print ("" + myDesc.getFid(),4);
                    print (" | ", 0);
                    print ("" + myDesc.getTypeName(myDesc.getType()),10);
                    print (" | ", 0);
                    print ("" + myField.getAsString(), 20);
                    print (" \n ", 0);                    
                }
             }
        }

        /*Invoked once the subscrption request has been dispatched from the
         * throttle queue.*/
        public void onCreate (MamaSubscription subscription)
        {
            subscriptions.add (subscription);  
        }

        /*Invoked if any errors are encountered during subscription processing*/
        public void onError(MamaSubscription subscription,
                            short            mamaStatus,
                            int              tibrvStatus, 
                            String           subject, 
                            Exception        e)
        {
            System.err.println ("Symbol=[" + subscription.getSymbol() + "] : " +
                                "An error occurred creating subscription: " +
                                MamaStatus.stringForStatus (mamaStatus));

        }

        /*Invoked if the quality status for the subscription changes*/
        public void onQuality (MamaSubscription subscription, short quality,
                               short cause, final Object platformInfo)
        {
            System.err.println( subscription.getSymbol () + 
                                ": quality is now " +
                                MamaQuality.toString (quality) +
                                ", cause " + cause + 
                                ", platformInfo: " + platformInfo);
        }
        public void onGap (MamaSubscription subscription)
        {
            System.err.println (subscription.getSymbol () + ": gap detected ");
        }
            
        public void onRecapRequest (MamaSubscription subscription)
        {
            System.err.println (subscription.getSymbol () + ": recap requested ");
        }

        public void onDestroy(MamaSubscription mamaSub)
        {
            System.out.println("SUBSCRIPTION DESTROYED!");
        }
    }
    
    /* Callback used to verify the data in the fieldcache against the feed*/
    private static class VerifyCallback implements MamaTimerCallback
    {
        SubscriptionCallback myCallback;
        String mySymbol;
        
        public VerifyCallback (SubscriptionCallback callback,String symbol)
        {
            myCallback = callback;
            mySymbol   = symbol;
        }
        
        public synchronized void onTimer (MamaTimer timer)
        {
            MamaSubscription subscription = new MamaSubscription ();
                           
            //Properties common to all subscription types
            subscription.setTimeout (timeout);
            subscription.setRetries (1);                                
            subscription.createSnapshotSubscription (
                                                    myCallback,
                                                    queueGroup.getNextQueue (),
                                                    mySource,
                                                    mySymbol,
                                                    null);                     
        }

        public void onDestroy(MamaTimer mamaTimer)
        {
            System.out.println("TIMER DESTROYED!");
        }
    }
}

Updated on 2023-03-31 at 15:29:45 +0100