1 package nl.tno.imb;
2
3 import nl.tno.imb.TEventEntry;
4
5 import java.io.InputStream;
6 import java.io.OutputStream;
7 import java.util.Arrays;
8 import java.io.IOException;
9 import java.net.Socket;
10 import java.net.SocketException;
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 public class TConnection {
31
32
33
34
35
36
37
38
39
40 public TConnection(String aRemoteHost, int aRemotePort, String aOwnerName, int aOwnerID, String aFederation) {
41 this(aRemoteHost, aRemotePort, aOwnerName, aOwnerID, aFederation, true);
42 }
43
44
45
46
47
48
49
50
51
52 public TConnection(String aRemoteHost, int aRemotePort, String aOwnerName, int aOwnerID, String aFederation,
53 boolean aStartReadingThread) {
54 ffederation = aFederation;
55 fownerName = aOwnerName;
56 fownerID = aOwnerID;
57 open(aRemoteHost, aRemotePort, aStartReadingThread);
58 }
59
60
61 protected void finalize() {
62 close();
63 }
64
65
66
67
68 private class TEventTranslation {
69 public final static int INVALID_TRANSLATED_EVENT_ID = -1;
70
71 private int[] feventTranslation;
72
73 public TEventTranslation() {
74 feventTranslation = new int[32];
75
76 for (int i = 0; i < feventTranslation.length; i++)
77 feventTranslation[i] = INVALID_TRANSLATED_EVENT_ID;
78 }
79
80 public int getTranslateEventID(int aRxEventID) {
81 if ((0 <= aRxEventID) && (aRxEventID < feventTranslation.length))
82 return feventTranslation[aRxEventID];
83 else
84 return INVALID_TRANSLATED_EVENT_ID;
85 }
86
87 public void setEventTranslation(int aRxEventID, int aTxEventID) {
88 if (aRxEventID >= 0) {
89
90
91 while (aRxEventID >= feventTranslation.length) {
92 int FormerSize = feventTranslation.length;
93
94 feventTranslation = Arrays.copyOf(feventTranslation, feventTranslation.length * 2);
95
96 for (int i = FormerSize; i < feventTranslation.length; i++)
97 feventTranslation[i] = INVALID_TRANSLATED_EVENT_ID;
98 }
99 feventTranslation[aRxEventID] = aTxEventID;
100 }
101 }
102 }
103
104
105 private class TEventEntryList {
106
107 TEventEntryList(int aInitialSize) {
108 FCount = 0;
109 fevents = new TEventEntry[aInitialSize];
110 }
111
112 private TEventEntry[] fevents;
113 private int FCount = 0;
114
115 public TEventEntry getEventEntry(int aEventID) {
116 if (0 <= aEventID && aEventID < FCount)
117 return fevents[aEventID];
118 else
119 return null;
120 }
121
122 public String getEventName(int aEventID) {
123 if (0 <= aEventID && aEventID < FCount) {
124 if (fevents[aEventID] != null)
125 return fevents[aEventID].getEventName();
126 else
127 return null;
128 } else
129 return "";
130 }
131
132 public TEventEntry addEvent(TConnection aConnection, String aEventName) {
133 FCount++;
134 if (FCount>fevents.length)
135 fevents = Arrays.copyOf(fevents, fevents.length * 2);
136 fevents[FCount - 1] = new TEventEntry(aConnection, FCount - 1, aEventName);
137
138 return fevents[FCount - 1];
139 }
140
141 public TEventEntry getEventEntryOnName(String aEventName) {
142 int i = FCount - 1;
143 while (i >= 0 && !getEventName(i).equals(aEventName))
144 i--;
145 if (i >= 0)
146 return fevents[i];
147 else
148 return null;
149 }
150 }
151
152 public static final String EVENT_FILTER_POST_FIX = "*";
153
154 private static final String MODEL_Status_VAR_NAME = "ModelStatus";
155
156 private static final String MODEL_STATUS_VAR_SEP_CHAR = "|";
157
158
159 public static final byte[] MAGIC_BYTES = new byte[] {
160 0x2F, 0x47, 0x61, 0x71, (byte) 0x95, (byte) 0xAD, (byte) 0xC5, (byte) 0xFB
161 };
162
163
164 private static final int MAGIC_STRING_CHECK_INT32 = 0x10F13467;
165
166 public static final byte[] MAGIC_STRING_CHECK = new byte[] {
167 0x67, 0x34, (byte) 0xF1, 0x10
168 };
169
170
171 private Socket fsocket = null;
172
173 private OutputStream foutputStream = null;
174
175 private InputStream finputStream = null;
176
177 private String fremoteHost = "";
178
179 private int fremotePort = 0;
180
181 private Thread freadingThread = null;
182
183 private TEventTranslation feventTranslation = new TEventTranslation();
184
185 private TEventEntryList feventEntryList = new TEventEntryList(8);
186
187 private String ffederation = DEFAULT_FEDERATION;
188
189 private static final String FOCUS_EVENT_NAME = "Focus";
190 private TEventEntry ffocusEvent = null;
191 private static final String CHANGE_FEDERATION_EVENT_NAME = "META_CurrentSession";
192 private TEventEntry fchangeFederationEvent = null;
193 private TEventEntry flogEvent = null;
194
195
196
197
198 private int funiqueClientID = 0;
199 private int fclientHandle = 0;
200 private int fownerID = 0;
201 private String fownerName = "";
202
203 private TEventEntry eventIDToEventL(int aEventID) {
204 synchronized (feventEntryList) {
205 return feventEntryList.getEventEntry(aEventID);
206 }
207 }
208
209 private TEventEntry addEvent(String aEventName) {
210 int EventID = 0;
211 TEventEntry Event;
212 while (EventID < feventEntryList.FCount && !feventEntryList.getEventEntry(EventID).isEmpty())
213 EventID += 1;
214 if (EventID < feventEntryList.FCount) {
215 Event = feventEntryList.getEventEntry(EventID);
216 Event.feventName = aEventName;
217 Event.fparent = null;
218 } else
219 Event = feventEntryList.addEvent(this, aEventName);
220 return Event;
221 }
222
223 private TEventEntry addEventL(String aEventName) {
224 TEventEntry Event;
225 synchronized (feventEntryList) {
226 Event = addEvent(aEventName);
227 }
228 return Event;
229 }
230
231 private TEventEntry findOrAddEventL(String aEventName) {
232 synchronized (feventEntryList) {
233 TEventEntry Event = feventEntryList.getEventEntryOnName(aEventName);
234 if (Event == null) {
235 int EventID = 0;
236 while (EventID < feventEntryList.FCount && !feventEntryList.getEventEntry(EventID).isEmpty())
237 EventID += 1;
238 if (EventID < feventEntryList.FCount) {
239 Event = feventEntryList.getEventEntry(EventID);
240 Event.feventName = aEventName;
241 } else
242 Event = feventEntryList.addEvent(this, aEventName);
243 }
244 return Event;
245 }
246 }
247
248 private TEventEntry findEventL(String aEventName) {
249 synchronized (feventEntryList) {
250 return feventEntryList.getEventEntryOnName(aEventName);
251 }
252 }
253
254 private TEventEntry findEventParentL(String aEventName) {
255 String ParentEventName;
256 String EventName;
257 ParentEventName = "";
258 TEventEntry Event = null;
259 synchronized (feventEntryList) {
260 for (int EventID = 0; EventID<feventEntryList.FCount; EventID++)
261 {
262 EventName = feventEntryList.getEventName(EventID);
263 if (EventName.endsWith(EVENT_FILTER_POST_FIX))
264 {
265 EventName = EventName.substring(0, EventName.length()-2);
266 if (aEventName.startsWith(EventName))
267 {
268 if (ParentEventName.length()<EventName.length())
269 {
270 Event = feventEntryList.getEventEntry(EventID);
271 ParentEventName = EventName;
272 }
273 }
274 }
275 }
276 return Event;
277 }
278 }
279
280 private TEventEntry findEventAutoPublishL(String aEventName) {
281 TEventEntry Event = findEventL(aEventName);
282 if (Event == null && autoPublish)
283 Event = publish(aEventName, false);
284 return Event;
285 }
286
287 private int readBytesFromNetStream(TByteBuffer aBuffer) {
288 try {
289 int Count = 0;
290 int NumBytesRead = -1;
291 while (aBuffer.getwriteAvailable() > 0 && NumBytesRead != 0) {
292 NumBytesRead = finputStream.read(aBuffer.getBuffer(), aBuffer.getWriteCursor(), aBuffer.getwriteAvailable());
293 aBuffer.written(NumBytesRead);
294 Count += NumBytesRead;
295 }
296 return Count;
297 } catch (IOException ex) {
298 return 0;
299 }
300 }
301
302
303
304
305 private int readCommand(TByteBuffer aFixedCommandPart, TByteBuffer aPayload, TByteBuffer aPayloadCheck)
306 throws IOException {
307 int NumBytesRead = finputStream.read(aFixedCommandPart.getBuffer(), 0, aFixedCommandPart.getLength());
308 if (NumBytesRead > 0) {
309 while (!aFixedCommandPart.compare(MAGIC_BYTES, 0)) {
310 int rbr = finputStream.read();
311
312 if (rbr != -1)
313 aFixedCommandPart.shiftLeftOneByte((byte) rbr);
314 else
315 return TEventEntry.IC_END_OF_SESSION;
316 }
317
318 int aCommand = aFixedCommandPart.peekInt32(MAGIC_BYTES.length);
319 int PayloadSize = aFixedCommandPart.peekInt32(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32);
320 if (PayloadSize <= MAX_PAYLOAD_SIZE) {
321 aPayload.clear(PayloadSize);
322 if (PayloadSize > 0) {
323 int Len = readBytesFromNetStream(aPayload);
324 if (Len == aPayload.getLength()) {
325 NumBytesRead = finputStream.read(aPayloadCheck.getBuffer(), 0, aPayloadCheck.getLength());
326 if (NumBytesRead == TByteBuffer.SIZE_OF_INT32 && aPayloadCheck.compare(MAGIC_STRING_CHECK, 0))
327 return aCommand;
328 else
329 return TEventEntry.IC_INVALID_COMMAND;
330 } else
331
332 return TEventEntry.IC_INVALID_COMMAND;
333 } else
334 return aCommand;
335 } else
336 return TEventEntry.IC_INVALID_COMMAND;
337 } else
338 return TEventEntry.IC_END_OF_SESSION;
339 }
340
341
342 private Integer fwiteCommandLock = new Integer(0);
343
344
345
346
347
348
349 protected int writeCommand(int aCommand, byte[] aPayload)
350 {
351 synchronized (fwiteCommandLock) {
352 TByteBuffer Buffer = new TByteBuffer();
353
354 Buffer.prepare(MAGIC_BYTES);
355 Buffer.prepare(aCommand);
356
357 if ((aPayload != null) && (aPayload.length > 0)) {
358 Buffer.prepare(aPayload.length);
359 Buffer.prepare(aPayload);
360 Buffer.prepare(MAGIC_STRING_CHECK_INT32);
361 } else
362 Buffer.prepare((int) 0);
363 Buffer.prepareApply();
364 Buffer.qWrite(MAGIC_BYTES);
365 Buffer.qWrite(aCommand);
366 if ((aPayload != null) && (aPayload.length > 0)) {
367 Buffer.qWrite(aPayload.length);
368 Buffer.qWrite(aPayload);
369 Buffer.qWrite(MAGIC_STRING_CHECK_INT32);
370 } else
371 Buffer.qWrite((int) 0);
372
373 if (isConnected()) {
374 try {
375 foutputStream.write(Buffer.getBuffer(), 0, Buffer.getLength());
376 return Buffer.getLength();
377 } catch (Exception ex) {
378 close();
379 return ICE_CONNECTION_CLOSED;
380 }
381 } else {
382 return ICE_CONNECTION_CLOSED;
383 }
384 }
385 }
386
387 protected String prefixFederation(String aName) {
388 return prefixFederation(aName, true);
389 }
390
391 protected String prefixFederation(String aName, boolean aUseFederationPrefix) {
392 if (!ffederation.equals("") && aUseFederationPrefix)
393 return ffederation + "." + aName;
394 else
395 return aName;
396 }
397
398
399
400
401
402 private void handleCommand(int aCommand, TByteBuffer aPayload) {
403 switch (aCommand) {
404 case TEventEntry.IC_EVENT:
405 handleCommandEvent(aPayload);
406 break;
407 case TEventEntry.IC_SET_VARIABLE:
408 handleCommandVariable(aPayload);
409 break;
410 case TEventEntry.IC_SET_EVENT_ID_TRANSLATION:
411 feventTranslation.setEventTranslation(aPayload.peekInt32(0, TEventTranslation.INVALID_TRANSLATED_EVENT_ID),
412 aPayload.peekInt32(TByteBuffer.SIZE_OF_INT32, TEventTranslation.INVALID_TRANSLATED_EVENT_ID));
413 break;
414 case TEventEntry.IC_UNIQUE_CLIENT_ID:
415 funiqueClientID = aPayload.readInt32();
416 fclientHandle = aPayload.readInt32();
417 break;
418
419
420
421
422
423
424
425
426 case TEventEntry.IC_EVENT_NAMES:
427 handleCommandEventNames(aPayload);
428 break;
429 case TEventEntry.IC_END_OF_SESSION:
430 close();
431 break;
432 case TEventEntry.IC_SUBSCRIBE:
433 case TEventEntry.IC_PUBLISH:
434 case TEventEntry.IC_UNSUBSCRIBE:
435 case TEventEntry.IC_UNPUBLISH:
436 handleSubAndPub(aCommand, aPayload);
437 break;
438 default:
439 handleCommandOther(aCommand, aPayload);
440 break;
441 }
442 }
443
444 private void handleCommandEvent(TByteBuffer aPayload) {
445 int TxEventID = feventTranslation.getTranslateEventID(aPayload.readInt32());
446 if (TxEventID != TEventTranslation.INVALID_TRANSLATED_EVENT_ID)
447 eventIDToEventL(TxEventID).handleEvent(aPayload);
448 }
449
450 private void handleCommandVariable(TByteBuffer aPayload) {
451 if (onVariable != null || onStatusUpdate != null) {
452 String VarName = aPayload.readString();
453
454
455 if (VarName.toUpperCase().endsWith(MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME.toUpperCase())) {
456 VarName = VarName.substring(0, VarName.length() - (MODEL_STATUS_VAR_SEP_CHAR.length() + MODEL_Status_VAR_NAME.length()));
457 String ModelName = VarName.substring(8, VarName.length());
458 String ModelUniqueClientID = VarName.substring(0, 8);
459 aPayload.readInt32();
460 int Status = aPayload.readInt32(-1);
461 int Progress = aPayload.readInt32(-1);
462 if (onStatusUpdate != null)
463 onStatusUpdate.dispatch(this, ModelUniqueClientID, ModelName, Progress, Status);
464 } else {
465 TByteBuffer VarValue = aPayload.readByteBuffer();
466 TByteBuffer PrevValue = new TByteBuffer();
467 if (onVariable != null)
468 onVariable.dispatch(this, VarName, VarValue.getBuffer(), PrevValue.getBuffer());
469 }
470 }
471 }
472
473 private void handleCommandEventNames(TByteBuffer aPayload) {
474 if (onEventNames != null) {
475 int ec = aPayload.readInt32();
476 TEventNameEntry[] EventNames = new TEventNameEntry[ec];
477 for (int en = 0; en < EventNames.length; en++) {
478 EventNames[en] = new TEventNameEntry();
479 EventNames[en].eventName = aPayload.readString();
480 EventNames[en].publishers = aPayload.readInt32();
481 EventNames[en].subscribers = aPayload.readInt32();
482 EventNames[en].timers = aPayload.readInt32();
483 }
484 onEventNames.dispatch(this, EventNames);
485 }
486 }
487
488 private void handleSubAndPub(int aCommand, TByteBuffer aPayload) {
489 String EventName;
490 TEventEntry EE;
491 TEventEntry EP;
492 switch (aCommand) {
493 case TEventEntry.IC_SUBSCRIBE:
494 case TEventEntry.IC_PUBLISH:
495 aPayload.readInt32();
496 aPayload.readInt32();
497 EventName = aPayload.readString();
498 EE = findEventL(EventName);
499 if (EE == null)
500 {
501
502 EP = findEventParentL(EventName);
503 if (EP != null)
504 {
505 EE = addEventL(EventName);
506 EE.fparent = EP;
507 EE.copyHandlersFrom(EP);
508 }
509 }
510 else
511 {
512 if ((onSubAndPub !=null) && !EE.isEmpty())
513 onSubAndPub.dispatch(this, aCommand, EventName);
514
515 }
516 if (EE != null)
517 EE.handleSubAndPub(aCommand);
518 break;
519 case TEventEntry.IC_UNSUBSCRIBE:
520 case TEventEntry.IC_UNPUBLISH:
521 EventName = aPayload.readString();
522 if (onSubAndPub !=null)
523 onSubAndPub.dispatch(this, aCommand, EventName);
524 EE = findEventL(EventName);
525 if (EE != null)
526 EE.handleSubAndPub(aCommand);
527 break;
528 }
529 }
530
531 protected void handleCommandOther(int aCommand, TByteBuffer aPayload) {
532
533 }
534
535 private int requestUniqueClientID() {
536 TByteBuffer Payload = new TByteBuffer();
537 Payload.prepare((int) 0);
538 Payload.prepare((int) 0);
539 Payload.prepareApply();
540 Payload.qWrite((int) 0);
541 Payload.qWrite((int) 0);
542 return writeCommand(TEventEntry.IC_UNIQUE_CLIENT_ID, Payload.getBuffer());
543 }
544
545 private int setOwner() {
546 if (isConnected()) {
547 TByteBuffer Payload = new TByteBuffer();
548 Payload.prepare(fownerID);
549 Payload.prepare(fownerName);
550 Payload.prepareApply();
551 Payload.qWrite(fownerID);
552 Payload.qWrite(fownerName);
553 return writeCommand(TEventEntry.IC_SET_CLIENT_INFO, Payload.getBuffer());
554 } else
555 return ICE_CONNECTION_CLOSED;
556 }
557
558 private void readCommands() {
559
560 int Command = TEventEntry.IC_INVALID_COMMAND;
561
562
563 TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32
564 + TByteBuffer.SIZE_OF_INT32);
565 TByteBuffer Payload = new TByteBuffer();
566 TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32);
567 do {
568 try {
569 try {
570 Command = readCommand(FixedCommandPart, Payload, PayloadCheck);
571 if (Command != TEventEntry.IC_INVALID_COMMAND)
572 handleCommand(Command, Payload);
573 } catch (ThreadDeath ex) {
574 Command = TEventEntry.IC_END_OF_SESSION;
575 }
576 } catch (Exception ex) {
577 if (isConnected())
578 System.out.println("## Exception in ReadCommands loop: " + ex.getMessage());
579 }
580 } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected());
581 }
582
583
584 protected enum TConnectionState {
585 icsUninitialized(0),
586 icsInitialized(1),
587 icsClient(2),
588 icsHub(3),
589 icsEnded(4),
590
591
592 icsGateway(100),
593 icsGatewayClient(101),
594 icsGatewayServer(102);
595
596 public final int value;
597
598 TConnectionState(int aValue) {
599 value = aValue;
600 }
601 }
602
603 protected void setState(TConnectionState aState) {
604 TByteBuffer Payload = new TByteBuffer();
605 Payload.prepare(aState.ordinal());
606 Payload.prepareApply();
607 Payload.qWrite(aState.ordinal());
608 writeCommand(TEventEntry.IC_SET_STATE, Payload.getBuffer());
609 }
610
611 protected boolean open(String aHost, int aPort) {
612 return open(aHost, aPort, true);
613 }
614
615 protected boolean open(String aHost, int aPort, boolean aStartReadingThread) {
616 close();
617 try {
618 fremoteHost = aHost;
619 fremotePort = aPort;
620 fsocket = new Socket(fremoteHost, fremotePort);
621 if (fsocket.isConnected()) {
622 foutputStream = fsocket.getOutputStream();
623 finputStream = fsocket.getInputStream();
624
625
626 if (aStartReadingThread) {
627 freadingThread = new Thread(new Runnable() {
628 public void run() {
629 readCommands();
630 }
631 });
632 freadingThread.setName("imb command reader");
633 freadingThread.start();
634 }
635 if (imb2Compatible)
636 requestUniqueClientID();
637
638 setOwner();
639
640 if (onVariable != null || onStatusUpdate != null)
641 writeCommand(TEventEntry.IC_ALL_VARIABLES, null);
642 }
643 return fsocket.isConnected();
644 } catch (Exception ex) {
645 return false;
646 }
647 }
648
649
650
651
652
653 public class TEventNameEntry {
654 public String eventName;
655 public int publishers;
656 public int subscribers;
657 public int timers;
658 }
659
660 public enum TVarPrefix {
661 vpUniqueClientID,
662 vpClientHandle
663 }
664
665
666
667 public static final int MAX_PAYLOAD_SIZE = 10 * 1024 * 1024;
668
669 public static final String DEFAULT_HUB = "localhost";
670
671 public static final int DEFAULT_PORT = 4000;
672
673 public static final String DEFAULT_FEDERATION = "TNOdemo";
674
675
676 public static final int ICE_CONNECTION_CLOSED = -1;
677
678 public static final int ICE_EVENT_NOT_PUBLISHED = -2;
679
680
681
682 public String getFederation() {
683 return ffederation;
684 }
685
686
687
688
689
690 public void setFederation(String aFederation) {
691 String OldFederation = ffederation;
692 TEventEntry Event;
693 if (isConnected() && (OldFederation != "")) {
694
695 for (int i = 0; i < feventEntryList.FCount; i++) {
696 String EventName = feventEntryList.getEventName(i);
697 if (!EventName.equals("") && EventName.startsWith(OldFederation + ".")) {
698 Event = feventEntryList.getEventEntry(i);
699 if (Event.isSubscribed())
700 Event.unSubscribe(false);
701 if (Event.isPublished())
702 Event.unPublish(false);
703 }
704 }
705 }
706 ffederation = aFederation;
707 if (isConnected() && (OldFederation != "")) {
708
709 for (int i = 0; i < feventEntryList.FCount; i++) {
710 String EventName = feventEntryList.getEventName(i);
711 if (!EventName.equals("") && EventName.startsWith(OldFederation + ".")) {
712 Event = feventEntryList.getEventEntry(i);
713 Event.feventName = ffederation + Event.feventName.substring(0, OldFederation.length());
714 if (Event.isSubscribed())
715 Event.subscribe();
716 if (Event.isPublished())
717 Event.publish();
718 }
719 }
720 }
721 }
722
723
724 public boolean autoPublish = true;
725
726
727 public boolean imb2Compatible = true;
728
729
730
731 public String getRemoteHost() {
732 return fremoteHost;
733 }
734
735
736 public int getRemotePort() {
737 return fremotePort;
738 }
739
740
741
742
743
744 public boolean getNoDelay() throws SocketException {
745 if (isConnected())
746 return fsocket.getTcpNoDelay();
747 else
748 return false;
749 }
750
751
752
753
754
755 public void setNoDelay(boolean aValue) throws SocketException {
756 if (isConnected())
757 fsocket.setTcpNoDelay(aValue);
758 }
759
760
761
762
763
764 public boolean getLinger() throws SocketException {
765 if (isConnected())
766 return fsocket.getSoLinger() != -1;
767 else
768 return false;
769 }
770
771
772
773
774
775 public void setLinger(boolean aValue) throws SocketException {
776 if (isConnected())
777 fsocket.setSoLinger(aValue, 2);
778 }
779
780
781 public boolean isConnected() {
782 return (fsocket != null) && fsocket.isConnected();
783 }
784
785
786 public void close() {
787 if ((fsocket != null) && fsocket.isConnected()) {
788 if (onDisconnect != null)
789 onDisconnect.dispatch(this);
790 writeCommand(TEventEntry.IC_END_OF_SESSION, null);
791 try {
792 foutputStream.close();
793 foutputStream = null;
794 finputStream.close();
795 finputStream = null;
796 fsocket.close();
797 fsocket = null;
798 freadingThread = null;
799 } catch (IOException e) {
800
801 e.printStackTrace();
802 }
803 }
804 }
805
806
807 public interface TOnDisconnect {
808 public void dispatch(TConnection aConnection);
809 }
810
811
812 public TOnDisconnect onDisconnect = null;
813
814
815
816
817 public void setThrottle(int aThrottle) {
818 TByteBuffer Payload = new TByteBuffer();
819 Payload.prepare(aThrottle);
820 Payload.prepareApply();
821 Payload.qWrite(aThrottle);
822 writeCommand(TEventEntry.IC_SET_THROTTLE, Payload.getBuffer());
823 }
824
825
826
827
828
829 public void readCommandsNonBlocking() throws IOException {
830 if (finputStream.available() != 0) {
831 int Command = TEventEntry.IC_INVALID_COMMAND;
832
833
834 TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32
835 + TByteBuffer.SIZE_OF_INT32);
836 TByteBuffer Payload = new TByteBuffer();
837 TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32);
838 do {
839 try {
840 try {
841 Command = readCommand(FixedCommandPart, Payload, PayloadCheck);
842 if (Command != TEventEntry.IC_INVALID_COMMAND)
843 handleCommand(Command, Payload);
844 } catch (ThreadDeath ex) {
845 Command = TEventEntry.IC_END_OF_SESSION;
846 }
847 } catch (Exception ex) {
848 if (isConnected())
849 System.out.println("## Exception in ReadCommands loop: " + ex.getMessage());
850 }
851 } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected() && (finputStream.available() != 0));
852 }
853 }
854
855
856
857
858
859
860 public void readCommandsNonThreaded(int aTimeOut) throws SocketException {
861 fsocket.setSoTimeout(aTimeOut);
862 int Command = TEventEntry.IC_INVALID_COMMAND;
863
864
865 TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32
866 + TByteBuffer.SIZE_OF_INT32);
867 TByteBuffer Payload = new TByteBuffer();
868 TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32);
869 do {
870 try {
871 try {
872 Command = readCommand(FixedCommandPart, Payload, PayloadCheck);
873 if (Command != TEventEntry.IC_INVALID_COMMAND)
874 handleCommand(Command, Payload);
875 } catch (ThreadDeath ex) {
876 Command = TEventEntry.IC_END_OF_SESSION;
877 }
878 } catch (Exception ex) {
879 if (isConnected())
880 System.out.println("## Exception in ReadCommands loop: " + ex.getMessage());
881 }
882 } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected());
883 }
884
885
886
887 public int getOwnerID() {
888 return fownerID;
889 }
890
891
892
893
894 public void setOwnerID(int aValue) {
895 if (fownerID != aValue) {
896 fownerID = aValue;
897 setOwner();
898 }
899 }
900
901
902 public String getOwnerName() {
903 return fownerName;
904 }
905
906
907
908
909 public void setOwnerName(String aValue) {
910 if (fownerName != aValue) {
911 fownerName = aValue;
912 setOwner();
913 }
914 }
915
916
917 public int getUniqueClientID() {
918 return funiqueClientID;
919 }
920
921
922 public int getClientHandle() {
923 return fclientHandle;
924 }
925
926
927
928
929
930
931 public TEventEntry subscribe(String aEventName) {
932 return subscribe(aEventName, true);
933 }
934
935
936
937
938
939
940 public TEventEntry subscribe(String aEventName, boolean aUseFederationPrefix) {
941 TEventEntry Event = findOrAddEventL(prefixFederation(aEventName, aUseFederationPrefix));
942 if (!Event.isSubscribed())
943 Event.subscribe();
944 return Event;
945 }
946
947
948
949
950
951 public TEventEntry publish(String aEventName) {
952 return publish(aEventName, true);
953 }
954
955
956
957
958
959
960 public TEventEntry publish(String aEventName, boolean aUseFederationPrefix) {
961 TEventEntry Event = findOrAddEventL(prefixFederation(aEventName, aUseFederationPrefix));
962 if (!Event.isPublished())
963 Event.publish();
964 return Event;
965 }
966
967
968
969
970 public void unSubscribe(String aEventName) {
971 unSubscribe(aEventName, true);
972 }
973
974
975
976
977
978 public void unSubscribe(String aEventName, boolean aUseFederationPrefix) {
979 TEventEntry Event = findEventL(prefixFederation(aEventName, aUseFederationPrefix));
980 if (Event != null && Event.isSubscribed())
981 Event.unSubscribe(true);
982 }
983
984
985
986
987 public void unPublish(String aEventName) {
988 unPublish(aEventName, true);
989 }
990
991
992
993
994
995 public void unPublish(String aEventName, boolean aUseFederationPrefix) {
996 TEventEntry Event = findEventL(prefixFederation(aEventName, aUseFederationPrefix));
997 if (Event != null && Event.isPublished())
998 Event.unPublish(true);
999 }
1000
1001
1002
1003
1004
1005
1006
1007
1008 public int signalEvent(String aEventName, int aEventKind, TByteBuffer aEventPayload) {
1009 return signalEvent(aEventName, aEventKind, aEventPayload, true);
1010 }
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020 public int signalEvent(String aEventName, int aEventKind, TByteBuffer aEventPayload, boolean aUseFederationPrefix) {
1021 TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix));
1022 if (Event != null)
1023 return Event.signalEvent(aEventKind, aEventPayload.getBuffer());
1024 else
1025 return ICE_EVENT_NOT_PUBLISHED;
1026 }
1027
1028
1029
1030
1031
1032
1033
1034
1035 public int signalBuffer(String aEventName, int aBufferID, byte[] aBuffer) {
1036 return signalBuffer(aEventName, aBufferID, aBuffer, 0, true);
1037 }
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048 public int signalBuffer(String aEventName, int aBufferID, byte[] aBuffer, int aEventFlags,
1049 boolean aUseFederationPrefix) {
1050 TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix));
1051 if (Event != null)
1052 return Event.signalBuffer(aBufferID, aBuffer, aEventFlags);
1053 else
1054 return ICE_EVENT_NOT_PUBLISHED;
1055 }
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065 public int signalChangeObject(String aEventName, int aAction, int aObjectID, String aAttribute) {
1066 return signalChangeObject(aEventName, aAction, aObjectID, aAttribute, true);
1067 }
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078 public int signalChangeObject(String aEventName, int aAction, int aObjectID, String aAttribute,
1079 boolean aUseFederationPrefix) {
1080 TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix));
1081 if (Event != null)
1082 return Event.signalChangeObject(aAction, aObjectID, aAttribute);
1083 else
1084 return ICE_EVENT_NOT_PUBLISHED;
1085 }
1086
1087
1088
1089
1090
1091
1092
1093 public int signalStream(String aEventName, String aStreamName, InputStream aStream) {
1094 return signalStream(aEventName, aStreamName, aStream, true);
1095 }
1096
1097
1098
1099
1100
1101
1102
1103
1104 public int signalStream(String aEventName, String aStreamName, InputStream aStream, boolean aUseFederationPrefix) {
1105 TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix));
1106 if (Event != null)
1107 return Event.signalStream(aStreamName, aStream);
1108 else
1109 return ICE_EVENT_NOT_PUBLISHED;
1110 }
1111
1112
1113
1114 public interface TOnVariable {
1115 public void dispatch(TConnection aConnection, String aVarName, byte[] aVarValue, byte[] aPrevValue);
1116 }
1117
1118
1119
1120 private TOnVariable onVariable = null;
1121
1122
1123
1124 public void setOnVariable(TOnVariable aValue)
1125 {
1126 onVariable = aValue;
1127 requestAllVariables();
1128 }
1129
1130
1131 protected void requestAllVariables() {
1132 writeCommand(TEventEntry.IC_ALL_VARIABLES, null);
1133 }
1134
1135
1136
1137
1138
1139 public void setVariableValue(String aVarName, String aVarValue) {
1140 TByteBuffer Payload = new TByteBuffer();
1141 Payload.prepare(aVarName);
1142 Payload.prepare(aVarValue);
1143 Payload.prepareApply();
1144 Payload.qWrite(aVarName);
1145 Payload.qWrite(aVarValue);
1146 writeCommand(TEventEntry.IC_SET_VARIABLE, Payload.getBuffer());
1147 }
1148
1149
1150
1151
1152
1153 public void setVariableValue(String aVarName, TByteBuffer aVarValue) {
1154 TByteBuffer Payload = new TByteBuffer();
1155 Payload.prepare(aVarName);
1156 Payload.prepare(aVarValue);
1157 Payload.prepareApply();
1158 Payload.qWrite(aVarName);
1159 Payload.qWrite(aVarValue);
1160 writeCommand(TEventEntry.IC_SET_VARIABLE, Payload.getBuffer());
1161 }
1162
1163
1164
1165
1166
1167
1168 public void setVariableValue(String aVarName, String aVarValue, TVarPrefix aVarPrefix) {
1169 TByteBuffer Payload = new TByteBuffer();
1170 Payload.prepare(aVarPrefix.ordinal());
1171 Payload.prepare(aVarName);
1172 Payload.prepare(aVarValue);
1173 Payload.prepareApply();
1174 Payload.qWrite(aVarPrefix.ordinal());
1175 Payload.qWrite(aVarName);
1176 Payload.qWrite(aVarValue);
1177 writeCommand(TEventEntry.IC_SET_VARIABLE_PREFIXED, Payload.getBuffer());
1178 }
1179
1180
1181
1182
1183
1184
1185 public void setVariableValue(String aVarName, TByteBuffer aVarValue, TVarPrefix aVarPrefix) {
1186 TByteBuffer Payload = new TByteBuffer();
1187 Payload.prepare(aVarPrefix.ordinal());
1188 Payload.prepare(aVarName);
1189 Payload.prepare(aVarValue);
1190 Payload.prepareApply();
1191 Payload.qWrite(aVarPrefix.ordinal());
1192 Payload.qWrite(aVarName);
1193 Payload.qWrite(aVarValue);
1194 writeCommand(TEventEntry.IC_SET_VARIABLE_PREFIXED, Payload.getBuffer());
1195 }
1196
1197
1198 public interface TOnStatusUpdate {
1199 public void dispatch(TConnection aConnection, String aModelUniqueClientID, String aModelName, int aProgress, int aStatus);
1200 }
1201
1202
1203
1204 private TOnStatusUpdate onStatusUpdate = null;
1205
1206
1207
1208 public void setOnStatusUpdate(TOnStatusUpdate aValue)
1209 {
1210 onStatusUpdate = aValue;
1211 requestAllVariables();
1212 }
1213
1214
1215
1216 public final static int STATUS_READY = 0;
1217
1218 public final static int STATUS_CALCULATING = 1;
1219
1220 public final static int STATUS_BUSY = 2;
1221
1222
1223
1224
1225
1226
1227 public void updateStatus(int aProgress, int aStatus) throws InterruptedException {
1228 TByteBuffer Payload = new TByteBuffer();
1229 Payload.prepare(aStatus);
1230 Payload.prepare(aProgress);
1231 Payload.prepareApply();
1232 Payload.qWrite(aStatus);
1233 Payload.qWrite(aProgress);
1234 if (imb2Compatible) {
1235
1236 if (funiqueClientID == 0) {
1237 int SpinCount = 10;
1238 while (funiqueClientID == 0 && SpinCount > 0) {
1239 Thread.sleep(500);
1240 SpinCount--;
1241 }
1242 }
1243
1244 setVariableValue(Integer.toHexString(funiqueClientID) + prefixFederation(fownerName).toUpperCase() + MODEL_STATUS_VAR_SEP_CHAR
1245 + MODEL_Status_VAR_NAME, Payload);
1246 } else
1247 setVariableValue(prefixFederation(fownerName).toUpperCase() + MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME, Payload,
1248 TVarPrefix.vpUniqueClientID);
1249 }
1250
1251
1252 public void removeStatus() {
1253 if (imb2Compatible)
1254
1255 setVariableValue(Integer.toHexString(funiqueClientID) + prefixFederation(fownerName) + MODEL_STATUS_VAR_SEP_CHAR
1256 + MODEL_Status_VAR_NAME, "");
1257 else
1258 setVariableValue(prefixFederation(fownerName) + MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME, "",
1259 TVarPrefix.vpUniqueClientID);
1260 }
1261
1262
1263
1264
1265
1266 public void subscribeOnFocus(TEventEntry.TOnFocus aOnFocus) {
1267 if (ffocusEvent == null)
1268 ffocusEvent = subscribe(FOCUS_EVENT_NAME);
1269 ffocusEvent.onFocus = aOnFocus;
1270 }
1271
1272
1273
1274
1275
1276
1277 public int signalFocus(double aX, double aY) {
1278 if (ffocusEvent == null)
1279 ffocusEvent = findEventAutoPublishL(prefixFederation(FOCUS_EVENT_NAME));
1280 if (ffocusEvent != null) {
1281 TByteBuffer Payload = new TByteBuffer();
1282 Payload.prepare(aX);
1283 Payload.prepare(aY);
1284 Payload.prepareApply();
1285 Payload.qWrite(aX);
1286 Payload.qWrite(aY);
1287 return ffocusEvent.signalEvent(TEventEntry.EK_CHANGE_OBJECT_EVENT, Payload.getBuffer());
1288 } else
1289 return ICE_EVENT_NOT_PUBLISHED;
1290 }
1291
1292
1293
1294
1295
1296 public void subscribeOnFederationChange(TEventEntry.TOnChangeFederation aOnChangeFederation) {
1297 if (fchangeFederationEvent == null)
1298 fchangeFederationEvent = subscribe(CHANGE_FEDERATION_EVENT_NAME);
1299 fchangeFederationEvent.onChangeFederation = aOnChangeFederation;
1300 }
1301
1302
1303
1304
1305
1306
1307 public int signalChangeFederation(int aNewFederationID, String aNewFederation) {
1308 if (fchangeFederationEvent == null)
1309 fchangeFederationEvent = findEventAutoPublishL(prefixFederation(CHANGE_FEDERATION_EVENT_NAME));
1310 if (fchangeFederationEvent != null)
1311 return fchangeFederationEvent.signalChangeObject(TEventEntry.ACTION_CHANGE, aNewFederationID, aNewFederation);
1312 else
1313 return ICE_EVENT_NOT_PUBLISHED;
1314 }
1315
1316
1317
1318
1319
1320
1321
1322
1323 public int logWriteLn(String aLogEventName, String aLine, TEventEntry.TLogLevel aLevel) {
1324 if (flogEvent == null)
1325 flogEvent = findEventAutoPublishL(prefixFederation(aLogEventName));
1326 if (flogEvent != null)
1327 return flogEvent.logWriteLn(aLine, aLevel);
1328 else
1329 return ICE_EVENT_NOT_PUBLISHED;
1330 }
1331
1332
1333
1334
1335
1336 public interface TOnEventnames {
1337 public void dispatch(TConnection aConnection, TEventNameEntry[] aEventNames);
1338 }
1339
1340
1341 public TOnEventnames onEventNames = null;
1342
1343
1344 public interface TOnSubAndPub {
1345 public void dispatch(TConnection aConnection, int aCommand, String aEventName);
1346 }
1347
1348
1349 public TOnSubAndPub onSubAndPub = null;
1350
1351
1352
1353 public static final int EF_PUBLISHERS = 1;
1354
1355 public static final int EF_SUBSCRIBERS = 2;
1356
1357 public static final int EF_TIMERS = 4;
1358
1359
1360
1361
1362
1363
1364 public int requestEventname(String aEventNameFilter, int aEventFilters) {
1365 TByteBuffer Payload = new TByteBuffer();
1366 Payload.prepare(aEventNameFilter);
1367 Payload.prepare(aEventFilters);
1368 Payload.prepareApply();
1369 Payload.qWrite(aEventNameFilter);
1370 Payload.qWrite(aEventFilters);
1371 return writeCommand(TEventEntry.IC_REQUEST_EVENT_NAMES, Payload.getBuffer());
1372 }
1373
1374
1375 @Override
1376 public final String toString()
1377 {
1378 return "TConnection [remoteHost=" + this.fremoteHost + ", remotePort=" + this.fremotePort + ", federation="
1379 + this.ffederation + ", isConnected()=" + this.isConnected() + "]";
1380 }
1381 }