View Javadoc
1   package nl.tno.imb;
2   
3   import nl.tno.imb.TConnection;
4   
5   import java.io.InputStream;
6   import java.io.OutputStream;
7   import java.util.ArrayList;
8   import java.util.List;
9   import java.io.IOException;
10  import java.nio.charset.Charset;
11  
12  
13  /**This class implements the events that can be send and received over the IMB framework.<br>
14   * Handlers can be attached to events that get called when specified events are received.
15   * 
16   * @author hans.cornelissen@tno.nl
17   */
18  public class TEventEntry {
19      // other sub classes
20      TEventEntry(TConnection aConnection, int aID, String aEventName) {
21          this.connection = aConnection;
22          this.ID = aID;
23          this.feventName = aEventName;
24          this.fparent = null;
25          this.fisPublished = false;
26          this.fisSubscribed = false;
27      }
28      
29      static final int IC_INVALID_COMMAND = -1; // to signal corrupt command
30      // private static final int icHeartBeat = -4;
31      static final int IC_END_OF_SESSION = -5;
32      // private static final int icFlushQueue = -6;
33      static final int IC_UNIQUE_CLIENT_ID = -7;
34      //private static final int icTimeStamp = -8;
35      
36      static final int IC_EVENT = -15;
37  
38      // private static final int icEndClientSession = -21;
39      // private static final int icFlushClientQueue = -22;
40      // private static final int icConnectToGateway = -23;
41  
42      static final int IC_SET_CLIENT_INFO = -31;
43      static final int IC_SET_VARIABLE = -32;
44      static final int IC_ALL_VARIABLES = -33;
45      static final int IC_SET_STATE = -34;
46      static final int IC_SET_THROTTLE = -35;
47      // private static final int icsSetNoDelay = -36;
48      static final int IC_SET_VARIABLE_PREFIXED = -37;
49  
50      static final int IC_REQUEST_EVENT_NAMES = -41;
51      static final int IC_EVENT_NAMES = -42;
52      // private static final int icRequestSubscribers = -43;
53      // private static final int icRequestPublishers = -44;
54  
55      static final int IC_SUBSCRIBE = -45;
56      static final int IC_UNSUBSCRIBE = -46;
57      static final int IC_PUBLISH = -47;
58      static final int IC_UNPUBLISH = -48;
59      
60      static final int IC_SET_EVENT_ID_TRANSLATION = -49;
61  
62      // private static final int icStatusEvent = -52;
63      // private static final int icStatusClient = -53;
64      // private static final int icStatusEventPlus = -54;
65      // private static final int icStatusClientPlus = -55;
66      // private static final int icStatusHUB = -56;
67      // private static final int icStatusTimer = -57;
68  
69      // private static final int icHumanReadableHeader = -60;
70      // private static final int icSetMonitor = -61;
71      // private static final int icResetMonitor = -62;
72  
73      static final int IC_CREATE_TIMER = -73;
74  
75      // locator commands = UDP)
76      static final int IC_HUB_LOCATE = -81;
77      static final int IC_HUB_FOUND = -82;
78  
79      // private static final int icLogClear = -91;
80      // private static final int icLogRequest = -92;
81      // private static final int icLogContents = -93;
82  
83  
84      // TEventKind
85      // IMB version 1
86      /** event kind: change object */
87      public static final int EK_CHANGE_OBJECT_EVENT = 0;             
88      // IMB version 2
89      /** event kind: header of a stream */
90      public static final int EK_STREAM_HEADER = 1;                   
91      /** event kind: body of a stream */
92      public static final int EK_STREAM_BODY = 2;                     
93      /** event kind: end of a stream */
94      public static final int EK_STREAM_TAIL = 3;                     
95      /** event kind: buffer event  */
96      public static final int EK_BUFFER = 4;                          
97      /** event kind: normal event */
98      public static final int EK_NORMAL_EVENT = 5;                    
99      // IMB version 3
100     /** event kind:  change object including changed data */
101     public static final int EK_CHANGE_OBJECT_DATA_EVENT = 6;        
102     /** event kind:  a child event was created */
103     public static final int EK_CHILD_EVENT_ADD = 11;                
104     /** event kind:  a child event was removed */
105     public static final int EK_CHILD_EVENT_REMOVE = 12;             
106     /** event kind:  send a line to the log */
107     public static final int EK_LOG_WRITELN = 30;                    
108     /** event kind:  cancel/remove a running timer */
109     public static final int EK_TIMER_CANCEL = 40;                   
110     /** event kind:  reset a timer */
111     public static final int EK_TIMER_PREPARE = 41;                  
112     /** event kind:  start or continue a timer */
113     public static final int EK_TIMER_START = 42;                    
114     /** event kind:  stop a running timer */
115     public static final int EK_TIMER_STOP = 43;                     
116     /** event kind:  add client to the acknowledge list of a timer */
117     public static final int EK_TIMER_ACKNOWLEDGE_LIST_ADD = 45;     
118     /** event kind:  remove client from the acknowledge list of a timer*/
119     public static final int EK_TIMER_ACKNOWLEDGE_LIST_REMOVE = 46;  
120     /** event kind:  set the relative speed of the timer */
121     public static final int EK_TIMER_SET_SPEED = 47;                
122     /** event kind:  timer tick */
123     public static final int EK_TIMER_TICK = 48;                     
124     /** event kind:  acknowledge timer tick */
125     public static final int EK_TIMER_ACKNOWLEDGE = 49;              
126     /** event kind:  request status update of a timer */
127     public static final int EK_TIMER_STATUS_REQUEST = 50;           
128 
129     /** defines the type of log entry to send */
130     public enum TLogLevel {
131         llRemark,
132         llDump,
133         llNormal,
134         llStart,
135         llFinish,
136         llPush,
137         llPop,
138         llStamp,
139         llSummary,
140         llWarning,
141         llError
142     }
143 
144     /** no limit on the number of timer events to send */
145     public static final int TRC_INFINITE = Integer.MAX_VALUE;
146     /** the maximum size a stream body or stream tail data part may be*/
147     private static final int MAX_STREAM_BODY_BUFFER_SIZE = 16 * 1024;
148     
149     // private/internal
150     private static final int EVENT_KIND_MASK = 0x000000FF;
151     private static final int EVENT_FLAGS_MASK = 0x0000FF00;
152 
153     private class TStreamCacheEntry {
154         private int fstreamID;
155         private OutputStream fstream;
156         private String fname;
157 
158         public TStreamCacheEntry(int aStreamID, OutputStream aStream, String aStreamName) {
159             this.fstreamID = aStreamID;
160             this.fstream = aStream;
161             this.fname = aStreamName;
162         }
163     }
164 
165     private class TStreamCache {
166         private List<TStreamCacheEntry> fstreamCacheList = new ArrayList<TStreamCacheEntry>();
167 
168         public TStreamCacheEntry find(int aStreamID) {
169             for (int i = 0; i < this.fstreamCacheList.size(); i++) {
170                 TStreamCacheEntry sce = this.fstreamCacheList.get(i);
171                 if (sce.fstreamID == aStreamID)
172                     return sce;
173             }
174             return null;
175         }
176 
177         public void cache(int aStreamID, OutputStream aStream, String aStreamName) {
178             this.fstreamCacheList.add(new TStreamCacheEntry(aStreamID, aStream, aStreamName));
179         }
180 
181         public void remove(int aStreamID) {
182             int i = 0;
183             while ((i < this.fstreamCacheList.size()) && (this.fstreamCacheList.get(i).fstreamID != aStreamID))
184                 i++;
185             if (i < this.fstreamCacheList.size())
186                 this.fstreamCacheList.remove(i);
187         }
188     }
189 
190     private boolean fisPublished;
191     private boolean fisSubscribed;
192     String feventName; // scope=package
193     TEventEntry fparent;
194     private TStreamCache fstreamCache = new TStreamCache();
195 
196     private int timerBasicCmd(int aEventKind, String aTimerName) {
197         TByteBuffer Payload = new TByteBuffer();
198         Payload.prepare(aTimerName);
199         Payload.prepareApply();
200         Payload.qWrite(aTimerName);
201         return signalEvent(aEventKind, Payload.getBuffer());
202     }
203 
204     private int timerAcknowledgeCmd(int aEventKind, String aTimerName, String aClientName) {
205         TByteBuffer Payload = new TByteBuffer();
206         Payload.prepare(aTimerName);
207         Payload.prepare(aClientName);
208         Payload.prepareApply();
209         Payload.qWrite(aTimerName);
210         Payload.qWrite(aClientName);
211         return signalEvent(aEventKind, Payload.getBuffer());
212     }
213 
214     void subscribe() {
215         this.fisSubscribed = true;
216         // send command
217         TByteBuffer Payload = new TByteBuffer();
218         Payload.prepare(this.ID);
219         Payload.prepare(0); // EET
220         Payload.prepare(getEventName());
221         Payload.prepareApply();
222         Payload.qWrite(this.ID);
223         Payload.qWrite(0); // EET
224         Payload.qWrite(getEventName());
225         this.connection.writeCommand(IC_SUBSCRIBE, Payload.getBuffer());
226     }
227 
228     void publish() {
229         this.fisPublished = true;
230         // send command
231         TByteBuffer Payload = new TByteBuffer();
232         Payload.prepare(this.ID);
233         Payload.prepare(0); // EET
234         Payload.prepare(getEventName());
235         Payload.prepareApply();
236         Payload.qWrite(this.ID);
237         Payload.qWrite(0); // EET
238         Payload.qWrite(getEventName());
239         this.connection.writeCommand(IC_PUBLISH, Payload.getBuffer());
240     }
241 
242     boolean isEmpty() {
243         return !(this.fisSubscribed || this.fisPublished);
244     } 
245     
246     private boolean fSubscribers;
247     private boolean fPublishers;
248     public boolean subscribers() { return this.fSubscribers; }
249     public boolean publishers() { return this.fPublishers; }
250 
251     void unSubscribe(boolean aChangeLocalState)
252     {
253         if (aChangeLocalState)
254             this.fisSubscribed = false;
255         // send command
256         TByteBuffer Payload = new TByteBuffer();
257         Payload.prepare(getEventName());
258         Payload.prepareApply();
259         Payload.qWrite(getEventName());
260         this.connection.writeCommand(IC_UNSUBSCRIBE, Payload.getBuffer());
261     }
262 
263     void unPublish(boolean aChangeLocalState)
264     {
265         if (aChangeLocalState)
266             this.fisPublished = false;
267         // send command
268         TByteBuffer Payload = new TByteBuffer();
269         Payload.prepare(getEventName());
270         Payload.prepareApply();
271         Payload.qWrite(getEventName());
272         this.connection.writeCommand(IC_UNPUBLISH, Payload.getBuffer());
273     }
274 
275     // dispatcher for all events
276     void handleEvent(TByteBuffer aPayload)
277     {
278         int EventTick;
279         int EventKindInt;
280         EventTick = aPayload.readInt32();
281         EventKindInt = aPayload.readInt32();
282         int eventKind = EventKindInt & EVENT_KIND_MASK;
283         switch (eventKind) {
284         case EK_CHANGE_OBJECT_EVENT:
285             handleChangeObject(aPayload);
286             break;
287         case EK_CHANGE_OBJECT_DATA_EVENT:
288             handleChangeObjectData(aPayload);
289             break;
290         case EK_BUFFER:
291             handleBuffer(EventTick, aPayload);
292             break;
293         case EK_NORMAL_EVENT:
294             if (this.onNormalEvent != null)
295                 this.onNormalEvent.dispatch(this, aPayload);
296             break;
297         case EK_TIMER_TICK:
298             handleTimerTick(aPayload);
299             break;
300         case EK_TIMER_PREPARE:
301             handleTimerCmd(EK_TIMER_PREPARE, aPayload);
302             break;
303         case EK_TIMER_START:
304             handleTimerCmd(EK_TIMER_START, aPayload);
305             break;
306         case EK_TIMER_STOP:
307             handleTimerCmd(EK_TIMER_STOP, aPayload);
308             break;
309         case EK_STREAM_HEADER:
310             handleStreamEvent(EK_STREAM_HEADER, aPayload);
311             break;
312         case EK_STREAM_BODY:
313             handleStreamEvent(EK_STREAM_BODY, aPayload);
314             break;
315         case EK_STREAM_TAIL:
316             handleStreamEvent(EK_STREAM_TAIL, aPayload);
317             break;
318         case EK_CHILD_EVENT_ADD:
319             handleChildEvent(EK_CHILD_EVENT_ADD, aPayload);
320             break;
321         case EK_CHILD_EVENT_REMOVE:
322             handleChildEvent(EK_CHILD_EVENT_REMOVE, aPayload);
323             break;
324         default:
325             if (this.onOtherEvent != null)
326                 this.onOtherEvent.dispatch(this, EventTick, eventKind, aPayload);
327             break;
328         }
329 
330     }
331 
332     // dispatchers for specific events
333     private void handleChangeObject(TByteBuffer aPayload) {
334         if (this.onFocus != null) {
335             double X;
336             double Y;
337             X = aPayload.readDouble();
338             Y = aPayload.readDouble();
339             this.onFocus.dispatch(X, Y);
340         } else {
341             if (this.onChangeFederation != null) {
342                 aPayload.readInt32(); // read action, not used
343                 int NewFederationID = aPayload.readInt32();
344                 String NewFederation = aPayload.readString();
345                 this.onChangeFederation.dispatch(this.connection, NewFederationID, NewFederation);
346             } else {
347                 if (this.onChangeObject != null) {
348                     int Action = aPayload.readInt32();
349                     int ObjectID = aPayload.readInt32();
350                     String Attribute = aPayload.readString();
351                     this.onChangeObject.dispatch(Action, ObjectID, getShortEventName(), Attribute);
352                 }
353             }
354         }
355     }
356 
357     private void handleChangeObjectData(TByteBuffer aPayload) {
358         if (this.onChangeObjectData != null) {
359             int Action = aPayload.readInt32();
360             int ObjectID = aPayload.readInt32();
361             String Attribute = aPayload.readString();
362             TByteBuffer NewValues = aPayload.readByteBuffer();
363             TByteBuffer OldValues = aPayload.readByteBuffer();
364             this.onChangeObjectData.dispatch(this, Action, ObjectID, Attribute, NewValues, OldValues);
365         }
366     }
367 
368     private void handleBuffer(int aEventTick, TByteBuffer aPayload) {
369         if (this.onBuffer != null) {
370             int BufferID = aPayload.readInt32();
371             TByteBuffer Buffer = aPayload.readByteBuffer();
372             this.onBuffer.dispatch(this, aEventTick, BufferID, Buffer);
373         }
374     }
375 
376     private void handleTimerTick(TByteBuffer aPayload) {
377         if (this.onTimerTick != null) {
378             String TimerName = aPayload.readString();
379             int Tick = aPayload.readInt32();
380             long TickTime = aPayload.readInt64();
381             long StartTime = aPayload.readInt64();
382             this.onTimerTick.dispatch(this, TimerName, Tick, TickTime, StartTime);
383         }
384     }
385 
386     private void handleTimerCmd(int aEventKind, TByteBuffer aPayload) {
387         if (this.onTimerCmd != null) {
388             String TimerName = aPayload.readString();
389             this.onTimerCmd.dispatch(this, aEventKind, TimerName);
390         }
391     }
392 
393     private void handleChildEvent(int aEventKind, TByteBuffer aPayload) {
394         if (this.onChildEvent != null) {
395             String EventName = aPayload.readString();
396             this.onChildEvent.dispatch(this, aEventKind, EventName);
397         }
398     }
399 
400     private void handleStreamEvent(int aEventKind, TByteBuffer aPayload) {
401         int StreamID;
402         String StreamName;
403         OutputStream stream;
404         TStreamCacheEntry sce;
405         switch (aEventKind) {
406         case EK_STREAM_HEADER:
407             if (this.onStreamCreate != null) {
408                 StreamID = aPayload.readInt32();
409                 StreamName = aPayload.readString();
410                 stream = this.onStreamCreate.dispatch(this, StreamName);
411                 if (stream != null)
412                     this.fstreamCache.cache(StreamID, stream, StreamName);
413             }
414             break;
415         case EK_STREAM_BODY:
416             StreamID = aPayload.readInt32();
417             sce = this.fstreamCache.find(StreamID);
418             if ((sce != null) && (sce.fstream != null)) {
419                 try {
420                     sce.fstream.write(aPayload.getBuffer(), aPayload.getReadCursor(), aPayload.getReadAvailable());
421                 } catch (IOException e) {
422                     // TODO Auto-generated catch block
423                     e.printStackTrace();
424                 }
425             }
426             break;
427         case EK_STREAM_TAIL:
428             StreamID = aPayload.readInt32();
429             sce = this.fstreamCache.find(StreamID);
430             if ((sce != null) && (sce.fstream != null)) {
431                 try {
432                     sce.fstream.write(aPayload.getBuffer(), aPayload.getReadCursor(), aPayload.getReadAvailable());
433                     if (this.onStreamEnd != null)
434                         this.onStreamEnd.dispatch(this, sce.fstream, sce.fname);
435                     sce.fstream.close();
436                 } catch (IOException e) {
437                     // TODO Auto-generated catch block
438                     e.printStackTrace();
439                 }
440                 this.fstreamCache.remove(StreamID);
441             }
442             break;
443         }
444     }
445 
446     void handleSubAndPub(int aCommand) {
447         if (this.fparent == null && this.onSubAndPub != null)
448             this.onSubAndPub.dispatch(this, aCommand);
449         switch (aCommand)
450         {
451         case IC_SUBSCRIBE:
452             if (this.fparent != null && !isPublished())
453                 publish();
454             this.fSubscribers = true;
455             break;
456         case IC_PUBLISH:
457             if (this.fparent != null && !isSubscribed())
458                 subscribe();
459             this.fPublishers = true;
460             break;
461         case IC_UNSUBSCRIBE:
462             if (this.fparent != null && isPublished())
463                 unPublish(true);
464             this.fSubscribers = false;
465             break;
466         case IC_UNPUBLISH:
467             if (this.fparent != null && isSubscribed())
468                 unSubscribe(true);
469             this.fPublishers = false;
470             break;
471         }
472     }
473 
474     // public
475 
476     private final TConnection connection;
477     
478     /** The local ID related to this event */
479     public final int ID;
480 
481     /** Returns the fully qualified name of this event */
482     public String getEventName() {
483         return this.feventName;
484     }
485     
486     public String getShortEventName() {
487         String federationPrefix = this.connection.getFederation()+".";
488         if (this.feventName.startsWith(federationPrefix))
489             return this.feventName.substring(federationPrefix.length());
490         else
491             return this.feventName;
492     }
493 
494     /** Returns true if this event is published */
495     public boolean isPublished() {
496         return this.fisPublished;
497     }
498 
499     /** Returns true if this event is subscribed */
500     public boolean isSubscribed() {
501         return this.fisSubscribed;
502     }
503 
504     public void copyHandlersFrom(TEventEntry aEventEntry) {
505         this.onChangeObject       = aEventEntry.onChangeObject;
506         this.onFocus              = aEventEntry.onFocus;
507         this.onNormalEvent        = aEventEntry.onNormalEvent;
508         this.onBuffer             = aEventEntry.onBuffer;
509         this.onStreamCreate       = aEventEntry.onStreamCreate;
510         this.onStreamEnd          = aEventEntry.onStreamEnd;
511         this.onChangeFederation   = aEventEntry.onChangeFederation;
512         this.onTimerTick          = aEventEntry.onTimerTick;
513         this.onTimerCmd           = aEventEntry.onTimerCmd;
514         this.onChangeObjectData   = aEventEntry.onChangeObjectData;
515         this.onOtherEvent         = aEventEntry.onOtherEvent;
516         this.onSubAndPub          = aEventEntry.onSubAndPub;
517     }
518 
519     // IMB 1
520     /** Override dispatch to implement a change object event handler */
521     public interface TOnChangeObject {
522         void dispatch(int aAction, int aObjectID, String aObjectName, String aAttribute);
523     }
524 
525     /** Handler to be called on receive of a change object event */
526     public TOnChangeObject onChangeObject = null;
527 
528     /** Override dispatch to implement a focus event handler */
529     public interface TOnFocus {
530         public void dispatch(double x, double y);
531     }
532 
533     /** Handler to be called on receive of a focus event */
534     public TOnFocus onFocus = null;
535 
536     // IMB 2
537     /** Override dispatch to implement a normal event handler */
538     public interface TOnNormalEvent {
539         public void dispatch(TEventEntry aEvent, TByteBuffer aPayload);
540     }
541 
542     /** Handler to be called on receive of a normal event */
543     public TOnNormalEvent onNormalEvent = null;
544 
545     /** Override dispatch to implement a buffer event handler */
546     public interface TOnBuffer {
547         public void dispatch(TEventEntry aEvent, int aTick, int aBufferID, TByteBuffer aBuffer);
548     }
549 
550     /** Handler to be called on receive of a buffer event */
551     public TOnBuffer onBuffer = null;
552 
553     /** Override dispatch to implement a handler of received streams, creating the local stream */
554     public interface TOnStreamCreate {
555         public OutputStream dispatch(TEventEntry aEvent, String aStreamName);
556     }
557 
558     /** Handler to be called on receive of a stream header event */
559     public TOnStreamCreate onStreamCreate = null;
560 
561     /** Override dispatch to implement a handler of received streams, action on end of stream */
562     public interface TOnStreamEnd {
563         public void dispatch(TEventEntry aEvent, /* ref */OutputStream aStream, String aStreamName);
564     }
565 
566     /** Handler to be called on receive of a stream tail event */
567     public TOnStreamEnd onStreamEnd = null;
568 
569     /** Override dispatch to implement a federation change handler */
570     public interface TOnChangeFederation {
571         public void dispatch(TConnection aConnection, int aNewFederationID, String aNewFederation);
572     }
573 
574     /** Handler to be called on receive of a federation change event */
575     public TOnChangeFederation onChangeFederation = null;
576 
577     // IMB 3
578     /** Override dispatch to implement a timer tick handler */
579     public interface TOnTimerTick {
580         public void dispatch(TEventEntry aEvent, String aTimerName, int aTick, long aTickTime, long aStartTime);
581     }
582 
583     /** Handler to be called on receive of a timer tick event */
584     public TOnTimerTick onTimerTick = null;
585 
586     /** Override dispatch to implement a timer command handler for commands reset/start/stop */
587     public interface TOnTimerCmd {
588         public void dispatch(TEventEntry aEvent, int aEventKind, String aTimerName);
589     }
590 
591     /** Handler to be called on receive of a timer command reset/start/stop */
592     public TOnTimerCmd onTimerCmd = null;
593 
594     /** Override dispatch to implement a handler for hub child event creation events */
595     public interface TOnChildEvent {
596         public void dispatch(TEventEntry aEvent, int aEventKind, String aEventName);
597     }
598 
599     /** Handler to be called on receive of a child add/remove event */
600     public TOnChildEvent onChildEvent = null;
601 
602     /** Override dispatch to implement a change object data event handler */
603     public interface TOnChangeObjectData {
604         public void dispatch(TEventEntry aEvent, int aAction, int aObjectID, String aAttribute, TByteBuffer aNewValues, TByteBuffer aOldValues);
605     }
606 
607     /** Handler to be called on receive of a change object with data event */
608     public TOnChangeObjectData onChangeObjectData = null;
609 
610     // TODO: description
611     public interface TOnSubAndPubEvent {
612         public void dispatch(TEventEntry aEvent, int aCommand);
613     }
614 
615     // TODO: description
616     public TOnSubAndPubEvent onSubAndPub = null;
617 
618     /** Override dispatch to implement a event handler for non-standard events */
619     public interface TOnOtherEvent {
620         public void dispatch(TEventEntry aEvent, int aTick, int aEventKind, TByteBuffer aPayload);
621     }
622 
623     /** Handler to be called on receive of an unhandled event */
624     public TOnOtherEvent onOtherEvent = null;
625 
626     // signals (send events)
627     
628     /**Send an event to the framework
629      * @param aEventKind
630      * @param aEventPayload
631      * @return status of the request (TConnection.ICE_* constants)
632      */
633     public int signalEvent(int aEventKind, byte[] aEventPayload) {
634         TByteBuffer Payload = new TByteBuffer();
635         if (!isPublished() && this.connection.autoPublish)
636             publish();
637         if (isPublished()) {
638             Payload.prepare(this.ID);
639             Payload.prepare((int) 0); // tick
640             Payload.prepare(aEventKind);
641             Payload.prepare(aEventPayload);
642             Payload.prepareApply();
643             Payload.qWrite(this.ID);
644             Payload.qWrite((int) (0)); // tick
645             Payload.qWrite(aEventKind);
646             Payload.qWrite(aEventPayload);
647             return this.connection.writeCommand(IC_EVENT, Payload.getBuffer());
648         } else
649             return TConnection.ICE_EVENT_NOT_PUBLISHED;
650     }
651 
652     /**Send a buffer event to the framework
653      * @param aBufferID self chosen ID to separate streams of buffer events
654      * @param aBuffer
655      * @return status of the request (TConnection.ICE_* constants)
656      */
657     public int signalBuffer(int aBufferID, byte[] aBuffer) {
658         return signalBuffer(aBufferID, aBuffer, 0);
659     }
660 
661     /**Send a buffer event to the framework
662      * @param aBufferID self chosen ID to separate streams of buffer events
663      * @param aBuffer
664      * @param aEventFlags flags for special processing within the hub; not fully implemented, use 0 
665      * @return status of the request (TConnection.ICE_* constants)
666      */
667     public int signalBuffer(int aBufferID, byte[] aBuffer, int aEventFlags) {
668         TByteBuffer Payload = new TByteBuffer();
669         if (!isPublished() && this.connection.autoPublish)
670             publish();
671         if (isPublished()) {
672             Payload.prepare(this.ID);
673             Payload.prepare((int) 0); // tick
674             Payload.prepare(EK_BUFFER | (aEventFlags & EVENT_FLAGS_MASK));
675             Payload.prepare(aBufferID);
676             Payload.prepare(aBuffer.length);
677             Payload.prepare(aBuffer);
678             Payload.prepareApply();
679             Payload.qWrite(this.ID);
680             Payload.qWrite((int) (0)); // tick
681             Payload.qWrite(EK_BUFFER | (aEventFlags & EVENT_FLAGS_MASK));
682             Payload.qWrite(aBufferID);
683             Payload.qWrite(aBuffer.length);
684             Payload.qWrite(aBuffer);
685             return this.connection.writeCommand(IC_EVENT, Payload.getBuffer());
686         } else
687             return TConnection.ICE_EVENT_NOT_PUBLISHED;
688     }
689 
690     private int readBytesFromStream(TByteBuffer aBuffer, InputStream aStream) {
691         try {
692             // TODO: cleanup code, in java stream read returns -1 when eos ?
693             int Count = 0;
694             int NumBytesRead = 1; // sentinel
695             while (aBuffer.getwriteAvailable() > 0 && NumBytesRead > 0) {
696                 NumBytesRead = aStream.read(aBuffer.getBuffer(), aBuffer.getWriteCursor(), aBuffer.getwriteAvailable());
697                 if (NumBytesRead > 0)
698                 {
699                     aBuffer.written(NumBytesRead);
700                     Count += NumBytesRead;
701                 }
702             }
703             return Count;
704         } catch (IOException ex) {
705             return 0; // signal stream read error
706         }
707     }
708 
709     /**Send a stream to the framework
710      * @param aStreamName
711      * @param aStream
712      * @return status of the request (TConnection.ICE_* constants)
713      */
714     public int signalStream(String aStreamName, InputStream aStream) {
715         TByteBuffer Payload = new TByteBuffer();
716         int ReadSize;
717         int BodyIndex;
718         int EventKindIndex;
719         if (!isPublished() && this.connection.autoPublish)
720             publish();
721         if (isPublished()) {
722             // ekStreamHeader, includes stream name, no stream data
723             byte[] StreamNameUTF8 = aStreamName.getBytes(Charset.forName("UTF-8"));
724             // TODO: generate semi-unique stream id from connection URI and stream name
725             int StreamID = StreamNameUTF8.hashCode() + this.connection.hashCode(); 
726             Payload.prepare(this.ID);
727             Payload.prepare((int) 0); // tick
728             Payload.prepare(EK_STREAM_HEADER); // event kind
729             Payload.prepare(StreamID);
730             Payload.prepare(aStreamName);
731             Payload.prepareApply();
732             Payload.qWrite(this.ID);
733             Payload.qWrite((int) 0); // tick
734             EventKindIndex = Payload.getWriteCursor();
735             Payload.qWrite(EK_STREAM_HEADER); // event kind
736             Payload.qWrite(StreamID);
737             BodyIndex = Payload.getWriteCursor();
738             Payload.qWrite(aStreamName);
739             int res = this.connection.writeCommand(IC_EVENT, Payload.getBuffer());
740             if (res > 0) {
741                 // ekStreamBody, only buffer size chunks of data
742                 // prepare payload to same value but aStreamName stripped
743                 // fix-up event kind
744                 Payload.writeStart(EventKindIndex);
745                 Payload.qWrite(EK_STREAM_BODY);
746                 Payload.writeStart(BodyIndex);
747                 // prepare room for body data
748                 Payload.prepareStart();
749                 Payload.prepareSize(MAX_STREAM_BODY_BUFFER_SIZE);
750                 Payload.prepareApply();
751                 // write pointer in ByteBuffer is still at beginning of stream read buffer!
752                 // but buffer is already created on correct length
753                 do {
754                     ReadSize = readBytesFromStream(Payload, aStream);
755                     // ReadSize = aStream.Read(Payload.Buffer, BodyIndex, Connection.MaxStreamBodyBuffer);
756                     if (ReadSize == MAX_STREAM_BODY_BUFFER_SIZE)
757                         res = this.connection.writeCommand(IC_EVENT, Payload.getBuffer());
758                     // reset write position
759                     Payload.writeStart(BodyIndex);
760                 } while ((ReadSize == MAX_STREAM_BODY_BUFFER_SIZE) && (res > 0));
761                 if (res > 0) {
762                     // clip ByteBuffer to bytes read from stream
763                     // write pointer in ByteBuffer is still at beginning of stream read buffer!
764                     Payload.prepareStart();
765                     Payload.prepareSize(ReadSize);
766                     Payload.prepareApplyAndTrim();
767                     // fixup event kind
768                     Payload.writeStart(EventKindIndex);
769                     Payload.qWrite(EK_STREAM_TAIL);
770                     res = this.connection.writeCommand(IC_EVENT, Payload.getBuffer());
771                 }
772             }
773             return res;
774         } else
775             return TConnection.ICE_EVENT_NOT_PUBLISHED;
776     }
777 
778     /** signal an object change: a new object is created */
779     public static final int ACTION_NEW = 0;
780     /** signal an object change: an object is deleted */
781     public static final int ACTION_DELETE = 1;
782     /** signal an object change: an existing object has changed */
783     public static final int ACTION_CHANGE = 2;
784 
785     /**Send a change object event to the framework
786      * @param aAction see ACTION_* constants
787      * @param aObjectID ID of the object that has changed
788      * @param aAttribute optional name of the attribute that has changed
789      * @return status of the request (TConnection.ICE_* constants)
790      */
791     public int signalChangeObject(int aAction, int aObjectID, String aAttribute) {
792         TByteBuffer Payload = new TByteBuffer();
793         if (!isPublished() && this.connection.autoPublish)
794             publish();
795         if (isPublished()) {
796             Payload.prepare(this.ID);
797             Payload.prepare((int) 0); // tick
798             Payload.prepare(EK_CHANGE_OBJECT_EVENT);
799             Payload.prepare(aAction);
800             Payload.prepare(aObjectID);
801             Payload.prepare(aAttribute);
802             Payload.prepareApply();
803             Payload.qWrite(this.ID);
804             Payload.qWrite((int) (0)); // tick
805             Payload.qWrite(EK_CHANGE_OBJECT_EVENT);
806             Payload.qWrite(aAction);
807             Payload.qWrite(aObjectID);
808             Payload.qWrite(aAttribute);
809             return this.connection.writeCommand(IC_EVENT, Payload.getBuffer());
810         } else
811             return TConnection.ICE_EVENT_NOT_PUBLISHED;
812     }
813 
814     // timers
815     /**Create a timer on the connected HUB
816      * @param aTimerName unique name of the timer within this event
817      * @param aStartTimeUTCorRelFT 0 means now<br>larger than 0 means in absolute system time (UTC)<br> less than 0 means system timer relative to now 
818      * @param aResolutionms the resolution of a timer tick (step) in milliseconds
819      * @param aSpeedFactor 1 means same speed as real time, 0 means the timer runs in simulation time
820      * @return status of the request (TConnection.ICE_* constants)
821      */
822     public int timerCreate(String aTimerName, long aStartTimeUTCorRelFT, int aResolutionms, double aSpeedFactor) {
823         return timerCreate(aTimerName, aStartTimeUTCorRelFT, aResolutionms, aSpeedFactor, TRC_INFINITE);
824     }
825 
826     /**Create a timer on the connected HUB
827      * @param aTimerName unique name of the timer within this event
828      * @param aStartTimeUTCorRelFT 0 means now<br>larger than 0 means in absolute system time (UTC)<br> less than 0 means system timer relative to now
829      * @param aResolutionms the resolution of a timer tick (step) in milliseconds
830      * @param aSpeedFactor 1 means same speed as real time, 0 means the timer runs in simulation time
831      * @param aRepeatCount number of timer the timer must send a timer tick (TRC_INFINITE for infinite)
832      * @return status of the request (TConnection.ICE_* constants)
833      */
834     public int timerCreate(String aTimerName, long aStartTimeUTCorRelFT, int aResolutionms, double aSpeedFactor,
835             int aRepeatCount) {
836         TByteBuffer Payload = new TByteBuffer();
837         if (!isPublished() && this.connection.autoPublish)
838             publish();
839         if (isPublished()) {
840             Payload.prepare(this.ID);
841             Payload.prepare(aTimerName);
842             Payload.prepare(aStartTimeUTCorRelFT);
843             Payload.prepare(aResolutionms);
844             Payload.prepare(aSpeedFactor);
845             Payload.prepare(aRepeatCount);
846             Payload.prepareApply();
847             Payload.qWrite(this.ID);
848             Payload.qWrite(aTimerName);
849             Payload.qWrite(aStartTimeUTCorRelFT);
850             Payload.qWrite(aResolutionms);
851             Payload.qWrite(aSpeedFactor);
852             Payload.qWrite(aRepeatCount);
853             return this.connection.writeCommand(IC_CREATE_TIMER, Payload.getBuffer());
854         } else
855             return TConnection.ICE_EVENT_NOT_PUBLISHED;
856     }
857 
858     /**Cancel a running timer; the timer is destroyed.
859      * @param aTimerName
860      * @return status of the request (TConnection.ICE_* constants)
861      */
862     public int timerCancel(String aTimerName) {
863         return timerBasicCmd(EK_TIMER_CANCEL, aTimerName);
864     }
865 
866     /**Prepare a timer; the timer is stopped and reset to an initial state
867      * @param aTimerName
868      * @return status of the request (TConnection.ICE_* constants)
869      */
870     public int timerPrepare(String aTimerName) {
871         return timerBasicCmd(EK_TIMER_PREPARE, aTimerName);
872     }
873 
874     /**Start or continue the timer
875      * @param aTimerName
876      * @return status of the request (TConnection.ICE_* constants)
877      */
878     public int timerStart(String aTimerName) {
879         return timerBasicCmd(EK_TIMER_START, aTimerName);
880     }
881 
882     /**Stop or pause the timer
883      * @param aTimerName
884      * @return status of the request (TConnection.ICE_* constants)
885      */
886     public int timerStop(String aTimerName) {
887         return timerBasicCmd(EK_TIMER_STOP, aTimerName);
888     }
889 
890     /**Set the relative running speed of the timer
891      * @param aTimerName
892      * @param aSpeedFactor 1 means the timer is running in real time, 0 means the timer runs in simulation time
893      * @return status of the request (TConnection.ICE_* constants)
894      */
895     public int timerSetSpeed(String aTimerName, double aSpeedFactor) {
896         TByteBuffer Payload = new TByteBuffer();
897         Payload.prepare(aTimerName);
898         Payload.prepare(aSpeedFactor);
899         Payload.prepareApply();
900         Payload.qWrite(aTimerName);
901         Payload.qWrite(aSpeedFactor);
902         return signalEvent(EK_TIMER_SET_SPEED, Payload.getBuffer());
903     }
904 
905     /**Add a client name to the acknowledge list of a timer.<br> 
906      * All entries in this list must send an acknowledge on each timer tick for the timer to advance.
907      * @param aTimerName
908      * @param aClientName
909      * @return status of the request (TConnection.ICE_* constants)
910      */
911     public int timerAcknowledgeAdd(String aTimerName, String aClientName) {
912         return timerAcknowledgeCmd(EK_TIMER_ACKNOWLEDGE_LIST_ADD, aTimerName, aClientName);
913     }
914 
915     /**Remove a client name from the acknowledge list of a timer.<br> 
916      * All entries in this list must send an acknowledge on each timer tick for the timer to advance.
917      * @param aTimerName
918      * @param aClientName
919      * @return status of the request (TConnection.ICE_* constants)
920      */
921     public int timerAcknowledgeRemove(String aTimerName, String aClientName) {
922         return timerAcknowledgeCmd(EK_TIMER_ACKNOWLEDGE_LIST_REMOVE, aTimerName, aClientName);
923     }
924 
925     /**Acknowledge a timer tick.<br>
926      * All clients on the timer acknowledge list must send an acknowledge on each timer tick for the timer to advance.
927      * @param aTimerName
928      * @param aClientName
929      * @param aProposedTimeStep clients can specify the next step that they wish. The overall lowest next step is used if the timer is not running in real time
930      * @return status of the request (TConnection.ICE_* constants)
931      */
932     public int timerAcknowledge(String aTimerName, String aClientName, int aProposedTimeStep) {
933         TByteBuffer Payload = new TByteBuffer();
934         Payload.prepare(aClientName);
935         Payload.prepare(aTimerName);
936         Payload.prepare(aProposedTimeStep);
937         Payload.prepareApply();
938         Payload.qWrite(aClientName);
939         Payload.qWrite(aTimerName);
940         Payload.qWrite(aProposedTimeStep);
941         return signalEvent(EK_TIMER_ACKNOWLEDGE, Payload.getBuffer());
942     }
943 
944     // log
945     /**Send a line to the central framework log
946      * @param aLine text to enter into the log
947      * @param aLevel severity of the entry to log. See TLogLevel for values. 
948      * @return status of the request (TConnection.ICE_* constants)
949      */
950     public int logWriteLn(String aLine, TLogLevel aLevel) {
951         TByteBuffer Payload = new TByteBuffer();
952         if (!isPublished() && this.connection.autoPublish)
953             publish();
954         if (isPublished()) {
955             Payload.prepare((int) 0); // client id filled in by hub
956             Payload.prepare(aLine);
957             Payload.prepare(aLevel.ordinal());
958             Payload.prepareApply();
959             Payload.qWrite((int) 0); // client id filled in by hub
960             Payload.qWrite(aLine);
961             Payload.qWrite(aLevel.ordinal());
962             return signalEvent(EK_LOG_WRITELN, Payload.getBuffer());
963         } else
964             return TConnection.ICE_EVENT_NOT_PUBLISHED;
965     }
966 }