1 package nl.tno.imb;
2
3 import nl.tno.imb.TEventEntry;
4
5 import java.io.InputStream;
6 import java.io.OutputStream;
7 import java.util.Arrays;
8 import java.io.IOException;
9 import java.net.Socket;
10 import java.net.SocketException;
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 public class TConnection {
31
32
33
34
35
36
37
38
39
40 public TConnection(String aRemoteHost, int aRemotePort, String aOwnerName, int aOwnerID, String aFederation) {
41 this(aRemoteHost, aRemotePort, aOwnerName, aOwnerID, aFederation, true);
42 }
43
44
45
46
47
48
49
50
51
52 public TConnection(String aRemoteHost, int aRemotePort, String aOwnerName, int aOwnerID, String aFederation,
53 boolean aStartReadingThread) {
54 this.ffederation = aFederation;
55 this.fownerName = aOwnerName;
56 this.fownerID = aOwnerID;
57 open(aRemoteHost, aRemotePort, aStartReadingThread);
58 }
59
60
61 protected void finalize() {
62 close();
63 }
64
65
66
67
68 private class TEventTranslation {
69 public final static int INVALID_TRANSLATED_EVENT_ID = -1;
70
71 private int[] feventTranslation;
72
73 public TEventTranslation() {
74 this.feventTranslation = new int[32];
75
76 for (int i = 0; i < this.feventTranslation.length; i++)
77 this.feventTranslation[i] = INVALID_TRANSLATED_EVENT_ID;
78 }
79
80 public int getTranslateEventID(int aRxEventID) {
81 if ((0 <= aRxEventID) && (aRxEventID < this.feventTranslation.length))
82 return this.feventTranslation[aRxEventID];
83 else
84 return INVALID_TRANSLATED_EVENT_ID;
85 }
86
87 public void setEventTranslation(int aRxEventID, int aTxEventID) {
88 if (aRxEventID >= 0) {
89
90
91 while (aRxEventID >= this.feventTranslation.length) {
92 int FormerSize = this.feventTranslation.length;
93
94 this.feventTranslation = Arrays.copyOf(this.feventTranslation, this.feventTranslation.length * 2);
95
96 for (int i = FormerSize; i < this.feventTranslation.length; i++)
97 this.feventTranslation[i] = INVALID_TRANSLATED_EVENT_ID;
98 }
99 this.feventTranslation[aRxEventID] = aTxEventID;
100 }
101 }
102 }
103
104
105 private class TEventEntryList {
106
107 TEventEntryList(int aInitialSize) {
108 this.FCount = 0;
109 this.fevents = new TEventEntry[aInitialSize];
110 }
111
112 private TEventEntry[] fevents;
113 private int FCount = 0;
114
115 public TEventEntry getEventEntry(int aEventID) {
116 if (0 <= aEventID && aEventID < this.FCount)
117 return this.fevents[aEventID];
118 else
119 return null;
120 }
121
122 public String getEventName(int aEventID) {
123 if (0 <= aEventID && aEventID < this.FCount) {
124 if (this.fevents[aEventID] != null)
125 return this.fevents[aEventID].getEventName();
126 else
127 return null;
128 } else
129 return "";
130 }
131
132 public TEventEntry addEvent(TConnection aConnection, String aEventName) {
133 this.FCount++;
134 if (this.FCount>this.fevents.length)
135 this.fevents = Arrays.copyOf(this.fevents, this.fevents.length * 2);
136 this.fevents[this.FCount - 1] = new TEventEntry(aConnection, this.FCount - 1, aEventName);
137
138 return this.fevents[this.FCount - 1];
139 }
140
141 public TEventEntry getEventEntryOnName(String aEventName) {
142 int i = this.FCount - 1;
143 while (i >= 0 && !getEventName(i).equals(aEventName))
144 i--;
145 if (i >= 0)
146 return this.fevents[i];
147 else
148 return null;
149 }
150 }
151
152 public static final String EVENT_FILTER_POST_FIX = "*";
153
154 private static final String MODEL_Status_VAR_NAME = "ModelStatus";
155
156 private static final String MODEL_STATUS_VAR_SEP_CHAR = "|";
157
158
159 public static final byte[] MAGIC_BYTES = new byte[] {
160 0x2F, 0x47, 0x61, 0x71, (byte) 0x95, (byte) 0xAD, (byte) 0xC5, (byte) 0xFB
161 };
162
163
164 private static final int MAGIC_STRING_CHECK_INT32 = 0x10F13467;
165
166 public static final byte[] MAGIC_STRING_CHECK = new byte[] {
167 0x67, 0x34, (byte) 0xF1, 0x10
168 };
169
170
171 private Socket fsocket = null;
172
173 private OutputStream foutputStream = null;
174
175 private InputStream finputStream = null;
176
177 private String fremoteHost = "";
178
179 private int fremotePort = 0;
180
181 private Thread freadingThread = null;
182
183 private TEventTranslation feventTranslation = new TEventTranslation();
184
185 private TEventEntryList feventEntryList = new TEventEntryList(8);
186
187 private String ffederation = DEFAULT_FEDERATION;
188
189 private static final String FOCUS_EVENT_NAME = "Focus";
190 private TEventEntry ffocusEvent = null;
191 private static final String CHANGE_FEDERATION_EVENT_NAME = "META_CurrentSession";
192 private TEventEntry fchangeFederationEvent = null;
193 private TEventEntry flogEvent = null;
194
195
196
197
198 private int funiqueClientID = 0;
199 private int fclientHandle = 0;
200 private int fownerID = 0;
201 private String fownerName = "";
202
203 private TEventEntry eventIDToEventL(int aEventID) {
204 synchronized (this.feventEntryList) {
205 return this.feventEntryList.getEventEntry(aEventID);
206 }
207 }
208
209 private TEventEntry addEvent(String aEventName) {
210 int EventID = 0;
211 TEventEntry Event;
212 while (EventID < this.feventEntryList.FCount && !this.feventEntryList.getEventEntry(EventID).isEmpty())
213 EventID += 1;
214 if (EventID < this.feventEntryList.FCount) {
215 Event = this.feventEntryList.getEventEntry(EventID);
216 Event.feventName = aEventName;
217 Event.fparent = null;
218 } else
219 Event = this.feventEntryList.addEvent(this, aEventName);
220 return Event;
221 }
222
223 private TEventEntry addEventL(String aEventName) {
224 TEventEntry Event;
225 synchronized (this.feventEntryList) {
226 Event = addEvent(aEventName);
227 }
228 return Event;
229 }
230
231 private TEventEntry findOrAddEventL(String aEventName) {
232 synchronized (this.feventEntryList) {
233 TEventEntry Event = this.feventEntryList.getEventEntryOnName(aEventName);
234 if (Event == null) {
235 int EventID = 0;
236 while (EventID < this.feventEntryList.FCount && !this.feventEntryList.getEventEntry(EventID).isEmpty())
237 EventID += 1;
238 if (EventID < this.feventEntryList.FCount) {
239 Event = this.feventEntryList.getEventEntry(EventID);
240 Event.feventName = aEventName;
241 } else
242 Event = this.feventEntryList.addEvent(this, aEventName);
243 }
244 return Event;
245 }
246 }
247
248 private TEventEntry findEventL(String aEventName) {
249 synchronized (this.feventEntryList) {
250 return this.feventEntryList.getEventEntryOnName(aEventName);
251 }
252 }
253
254 private TEventEntry findEventParentL(String aEventName) {
255 String ParentEventName;
256 String EventName;
257 ParentEventName = "";
258 TEventEntry Event = null;
259 synchronized (this.feventEntryList) {
260 for (int EventID = 0; EventID<this.feventEntryList.FCount; EventID++)
261 {
262 EventName = this.feventEntryList.getEventName(EventID);
263 if (EventName.endsWith(EVENT_FILTER_POST_FIX))
264 {
265 EventName = EventName.substring(0, EventName.length()-2);
266 if (aEventName.startsWith(EventName))
267 {
268 if (ParentEventName.length()<EventName.length())
269 {
270 Event = this.feventEntryList.getEventEntry(EventID);
271 ParentEventName = EventName;
272 }
273 }
274 }
275 }
276 return Event;
277 }
278 }
279
280 private TEventEntry findEventAutoPublishL(String aEventName) {
281 TEventEntry Event = findEventL(aEventName);
282 if (Event == null && this.autoPublish)
283 Event = publish(aEventName, false);
284 return Event;
285 }
286
287 private int readBytesFromNetStream(TByteBuffer aBuffer) {
288 try {
289 int Count = 0;
290 int NumBytesRead = -1;
291 while (aBuffer.getwriteAvailable() > 0 && NumBytesRead != 0) {
292 NumBytesRead = this.finputStream.read(aBuffer.getBuffer(), aBuffer.getWriteCursor(), aBuffer.getwriteAvailable());
293 aBuffer.written(NumBytesRead);
294 Count += NumBytesRead;
295 }
296 return Count;
297 } catch (IOException ex) {
298 return 0;
299 }
300 }
301
302
303
304
305 private int readCommand(TByteBuffer aFixedCommandPart, TByteBuffer aPayload, TByteBuffer aPayloadCheck)
306 throws IOException {
307 int NumBytesRead = this.finputStream.read(aFixedCommandPart.getBuffer(), 0, aFixedCommandPart.getLength());
308 if (NumBytesRead > 0) {
309 while (!aFixedCommandPart.compare(MAGIC_BYTES, 0)) {
310 int rbr = this.finputStream.read();
311
312 if (rbr != -1)
313 aFixedCommandPart.shiftLeftOneByte((byte) rbr);
314 else
315 return TEventEntry.IC_END_OF_SESSION;
316 }
317
318 int aCommand = aFixedCommandPart.peekInt32(MAGIC_BYTES.length);
319 int PayloadSize = aFixedCommandPart.peekInt32(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32);
320 if (PayloadSize <= MAX_PAYLOAD_SIZE) {
321 aPayload.clear(PayloadSize);
322 if (PayloadSize > 0) {
323 int Len = readBytesFromNetStream(aPayload);
324 if (Len == aPayload.getLength()) {
325 NumBytesRead = this.finputStream.read(aPayloadCheck.getBuffer(), 0, aPayloadCheck.getLength());
326 if (NumBytesRead == TByteBuffer.SIZE_OF_INT32 && aPayloadCheck.compare(MAGIC_STRING_CHECK, 0))
327 return aCommand;
328 else
329 return TEventEntry.IC_INVALID_COMMAND;
330 } else
331
332 return TEventEntry.IC_INVALID_COMMAND;
333 } else
334 return aCommand;
335 } else
336 return TEventEntry.IC_INVALID_COMMAND;
337 } else
338 return TEventEntry.IC_END_OF_SESSION;
339 }
340
341
342 private Integer fwiteCommandLock = new Integer(0);
343
344
345
346
347
348
349 protected int writeCommand(int aCommand, byte[] aPayload)
350 {
351 synchronized (this.fwiteCommandLock) {
352 TByteBuffer Buffer = new TByteBuffer();
353
354 Buffer.prepare(MAGIC_BYTES);
355 Buffer.prepare(aCommand);
356
357 if ((aPayload != null) && (aPayload.length > 0)) {
358 Buffer.prepare(aPayload.length);
359 Buffer.prepare(aPayload);
360 Buffer.prepare(MAGIC_STRING_CHECK_INT32);
361 } else
362 Buffer.prepare((int) 0);
363 Buffer.prepareApply();
364 Buffer.qWrite(MAGIC_BYTES);
365 Buffer.qWrite(aCommand);
366 if ((aPayload != null) && (aPayload.length > 0)) {
367 Buffer.qWrite(aPayload.length);
368 Buffer.qWrite(aPayload);
369 Buffer.qWrite(MAGIC_STRING_CHECK_INT32);
370 } else
371 Buffer.qWrite((int) 0);
372
373 if (isConnected()) {
374 try {
375 this.foutputStream.write(Buffer.getBuffer(), 0, Buffer.getLength());
376 return Buffer.getLength();
377 } catch (Exception ex) {
378 close();
379 return ICE_CONNECTION_CLOSED;
380 }
381 } else {
382 return ICE_CONNECTION_CLOSED;
383 }
384 }
385 }
386
387 protected String prefixFederation(String aName) {
388 return prefixFederation(aName, true);
389 }
390
391 protected String prefixFederation(String aName, boolean aUseFederationPrefix) {
392 if (!this.ffederation.equals("") && aUseFederationPrefix)
393 return this.ffederation + "." + aName;
394 else
395 return aName;
396 }
397
398
399
400
401
402 private void handleCommand(int aCommand, TByteBuffer aPayload) {
403 switch (aCommand) {
404 case TEventEntry.IC_EVENT:
405 handleCommandEvent(aPayload);
406 break;
407 case TEventEntry.IC_SET_VARIABLE:
408 handleCommandVariable(aPayload);
409 break;
410 case TEventEntry.IC_SET_EVENT_ID_TRANSLATION:
411 this.feventTranslation.setEventTranslation(aPayload.peekInt32(0, TEventTranslation.INVALID_TRANSLATED_EVENT_ID),
412 aPayload.peekInt32(TByteBuffer.SIZE_OF_INT32, TEventTranslation.INVALID_TRANSLATED_EVENT_ID));
413 break;
414 case TEventEntry.IC_UNIQUE_CLIENT_ID:
415 this.funiqueClientID = aPayload.readInt32();
416 this.fclientHandle = aPayload.readInt32();
417 break;
418
419
420
421
422
423
424
425
426 case TEventEntry.IC_EVENT_NAMES:
427 handleCommandEventNames(aPayload);
428 break;
429 case TEventEntry.IC_END_OF_SESSION:
430 close();
431 break;
432 case TEventEntry.IC_SUBSCRIBE:
433 case TEventEntry.IC_PUBLISH:
434 case TEventEntry.IC_UNSUBSCRIBE:
435 case TEventEntry.IC_UNPUBLISH:
436 handleSubAndPub(aCommand, aPayload);
437 break;
438 default:
439 handleCommandOther(aCommand, aPayload);
440 break;
441 }
442 }
443
444 private void handleCommandEvent(TByteBuffer aPayload) {
445 int TxEventID = this.feventTranslation.getTranslateEventID(aPayload.readInt32());
446 if (TxEventID != TEventTranslation.INVALID_TRANSLATED_EVENT_ID)
447 eventIDToEventL(TxEventID).handleEvent(aPayload);
448 }
449
450 private void handleCommandVariable(TByteBuffer aPayload) {
451 if (this.onVariable != null || this.onStatusUpdate != null) {
452 String VarName = aPayload.readString();
453
454
455 if (VarName.toUpperCase().endsWith(MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME.toUpperCase())) {
456 VarName = VarName.substring(0, VarName.length() - (MODEL_STATUS_VAR_SEP_CHAR.length() + MODEL_Status_VAR_NAME.length()));
457 String ModelName = VarName.substring(8, VarName.length());
458 String ModelUniqueClientID = VarName.substring(0, 8);
459 aPayload.readInt32();
460 int Status = aPayload.readInt32(-1);
461 int Progress = aPayload.readInt32(-1);
462 if (this.onStatusUpdate != null)
463 this.onStatusUpdate.dispatch(this, ModelUniqueClientID, ModelName, Progress, Status);
464 } else {
465 TByteBuffer VarValue = aPayload.readByteBuffer();
466 TByteBuffer PrevValue = new TByteBuffer();
467 if (this.onVariable != null)
468 this.onVariable.dispatch(this, VarName, VarValue.getBuffer(), PrevValue.getBuffer());
469 }
470 }
471 }
472
473 private void handleCommandEventNames(TByteBuffer aPayload) {
474 if (this.onEventNames != null) {
475 int ec = aPayload.readInt32();
476 TEventNameEntry[] EventNames = new TEventNameEntry[ec];
477 for (int en = 0; en < EventNames.length; en++) {
478 EventNames[en] = new TEventNameEntry();
479 EventNames[en].eventName = aPayload.readString();
480 EventNames[en].publishers = aPayload.readInt32();
481 EventNames[en].subscribers = aPayload.readInt32();
482 EventNames[en].timers = aPayload.readInt32();
483 }
484 this.onEventNames.dispatch(this, EventNames);
485 }
486 }
487
488 private void handleSubAndPub(int aCommand, TByteBuffer aPayload) {
489 String EventName;
490 TEventEntry EE;
491 TEventEntry EP;
492 switch (aCommand) {
493 case TEventEntry.IC_SUBSCRIBE:
494 case TEventEntry.IC_PUBLISH:
495 aPayload.readInt32();
496 aPayload.readInt32();
497 EventName = aPayload.readString();
498 EE = findEventL(EventName);
499 if (EE == null)
500 {
501
502 EP = findEventParentL(EventName);
503 if (EP != null)
504 {
505 EE = addEventL(EventName);
506 EE.fparent = EP;
507 EE.copyHandlersFrom(EP);
508 }
509 }
510 else
511 {
512 if ((this.onSubAndPub !=null) && !EE.isEmpty())
513 this.onSubAndPub.dispatch(this, aCommand, EventName);
514
515 }
516 if (EE != null)
517 EE.handleSubAndPub(aCommand);
518 break;
519 case TEventEntry.IC_UNSUBSCRIBE:
520 case TEventEntry.IC_UNPUBLISH:
521 EventName = aPayload.readString();
522 if (this.onSubAndPub !=null)
523 this.onSubAndPub.dispatch(this, aCommand, EventName);
524 EE = findEventL(EventName);
525 if (EE != null)
526 EE.handleSubAndPub(aCommand);
527 break;
528 }
529 }
530
531 protected void handleCommandOther(int aCommand, TByteBuffer aPayload) {
532
533 }
534
535 private int requestUniqueClientID() {
536 TByteBuffer Payload = new TByteBuffer();
537 Payload.prepare((int) 0);
538 Payload.prepare((int) 0);
539 Payload.prepareApply();
540 Payload.qWrite((int) 0);
541 Payload.qWrite((int) 0);
542 return writeCommand(TEventEntry.IC_UNIQUE_CLIENT_ID, Payload.getBuffer());
543 }
544
545 private int setOwner() {
546 if (isConnected()) {
547 TByteBuffer Payload = new TByteBuffer();
548 Payload.prepare(this.fownerID);
549 Payload.prepare(this.fownerName);
550 Payload.prepareApply();
551 Payload.qWrite(this.fownerID);
552 Payload.qWrite(this.fownerName);
553 return writeCommand(TEventEntry.IC_SET_CLIENT_INFO, Payload.getBuffer());
554 } else
555 return ICE_CONNECTION_CLOSED;
556 }
557
558 private void readCommands() {
559
560 int Command = TEventEntry.IC_INVALID_COMMAND;
561
562
563 TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32
564 + TByteBuffer.SIZE_OF_INT32);
565 TByteBuffer Payload = new TByteBuffer();
566 TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32);
567 do {
568 try {
569 try {
570 Command = readCommand(FixedCommandPart, Payload, PayloadCheck);
571 if (Command != TEventEntry.IC_INVALID_COMMAND)
572 handleCommand(Command, Payload);
573 } catch (ThreadDeath ex) {
574 Command = TEventEntry.IC_END_OF_SESSION;
575 }
576 } catch (Exception ex) {
577 if (isConnected())
578 {
579 System.out.println("## Exception in ReadCommands loop: " + ex.getMessage());
580 ex.printStackTrace();
581 }
582 }
583 } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected());
584 }
585
586
587 protected enum TConnectionState {
588 icsUninitialized(0),
589 icsInitialized(1),
590 icsClient(2),
591 icsHub(3),
592 icsEnded(4),
593
594
595 icsGateway(100),
596 icsGatewayClient(101),
597 icsGatewayServer(102);
598
599 public final int value;
600
601 TConnectionState(int aValue) {
602 this.value = aValue;
603 }
604 }
605
606 protected void setState(TConnectionState aState) {
607 TByteBuffer Payload = new TByteBuffer();
608 Payload.prepare(aState.ordinal());
609 Payload.prepareApply();
610 Payload.qWrite(aState.ordinal());
611 writeCommand(TEventEntry.IC_SET_STATE, Payload.getBuffer());
612 }
613
614 protected boolean open(String aHost, int aPort) {
615 return open(aHost, aPort, true);
616 }
617
618 protected boolean open(String aHost, int aPort, boolean aStartReadingThread) {
619 close();
620 try {
621 this.fremoteHost = aHost;
622 this.fremotePort = aPort;
623 this.fsocket = new Socket(this.fremoteHost, this.fremotePort);
624 if (this.fsocket.isConnected()) {
625 this.foutputStream = this.fsocket.getOutputStream();
626 this.finputStream = this.fsocket.getInputStream();
627
628
629 if (aStartReadingThread) {
630 this.freadingThread = new Thread(new Runnable() {
631 public void run() {
632 readCommands();
633 }
634 });
635 this.freadingThread.setName("imb command reader");
636 this.freadingThread.start();
637 }
638 if (this.imb2Compatible)
639 requestUniqueClientID();
640
641 setOwner();
642
643 if (this.onVariable != null || this.onStatusUpdate != null)
644 writeCommand(TEventEntry.IC_ALL_VARIABLES, null);
645 }
646 return this.fsocket.isConnected();
647 } catch (Exception ex) {
648 return false;
649 }
650 }
651
652
653
654
655
656 public class TEventNameEntry {
657 public String eventName;
658 public int publishers;
659 public int subscribers;
660 public int timers;
661 }
662
663 public enum TVarPrefix {
664 vpUniqueClientID,
665 vpClientHandle
666 }
667
668
669
670 public static final int MAX_PAYLOAD_SIZE = 10 * 1024 * 1024;
671
672 public static final String DEFAULT_HUB = "localhost";
673
674 public static final int DEFAULT_PORT = 4000;
675
676 public static final String DEFAULT_FEDERATION = "TNOdemo";
677
678
679 public static final int ICE_CONNECTION_CLOSED = -1;
680
681 public static final int ICE_EVENT_NOT_PUBLISHED = -2;
682
683
684
685 public String getFederation() {
686 return this.ffederation;
687 }
688
689
690
691
692
693 public void setFederation(String aFederation) {
694 String OldFederation = this.ffederation;
695 TEventEntry Event;
696 if (isConnected() && (OldFederation != "")) {
697
698 for (int i = 0; i < this.feventEntryList.FCount; i++) {
699 String EventName = this.feventEntryList.getEventName(i);
700 if (!EventName.equals("") && EventName.startsWith(OldFederation + ".")) {
701 Event = this.feventEntryList.getEventEntry(i);
702 if (Event.isSubscribed())
703 Event.unSubscribe(false);
704 if (Event.isPublished())
705 Event.unPublish(false);
706 }
707 }
708 }
709 this.ffederation = aFederation;
710 if (isConnected() && (OldFederation != "")) {
711
712 for (int i = 0; i < this.feventEntryList.FCount; i++) {
713 String EventName = this.feventEntryList.getEventName(i);
714 if (!EventName.equals("") && EventName.startsWith(OldFederation + ".")) {
715 Event = this.feventEntryList.getEventEntry(i);
716 Event.feventName = this.ffederation + Event.feventName.substring(0, OldFederation.length());
717 if (Event.isSubscribed())
718 Event.subscribe();
719 if (Event.isPublished())
720 Event.publish();
721 }
722 }
723 }
724 }
725
726
727 public boolean autoPublish = true;
728
729
730 public boolean imb2Compatible = true;
731
732
733
734 public String getRemoteHost() {
735 return this.fremoteHost;
736 }
737
738
739 public int getRemotePort() {
740 return this.fremotePort;
741 }
742
743
744
745
746
747 public boolean getNoDelay() throws SocketException {
748 if (isConnected())
749 return this.fsocket.getTcpNoDelay();
750 else
751 return false;
752 }
753
754
755
756
757
758 public void setNoDelay(boolean aValue) throws SocketException {
759 if (isConnected())
760 this.fsocket.setTcpNoDelay(aValue);
761 }
762
763
764
765
766
767 public boolean getLinger() throws SocketException {
768 if (isConnected())
769 return this.fsocket.getSoLinger() != -1;
770 else
771 return false;
772 }
773
774
775
776
777
778 public void setLinger(boolean aValue) throws SocketException {
779 if (isConnected())
780 this.fsocket.setSoLinger(aValue, 2);
781 }
782
783
784 public boolean isConnected() {
785 return (this.fsocket != null) && this.fsocket.isConnected();
786 }
787
788
789 public void close() {
790 if ((this.fsocket != null) && this.fsocket.isConnected()) {
791 if (this.onDisconnect != null)
792 this.onDisconnect.dispatch(this);
793 writeCommand(TEventEntry.IC_END_OF_SESSION, null);
794 try {
795 this.foutputStream.close();
796 this.foutputStream = null;
797 this.finputStream.close();
798 this.finputStream = null;
799 this.fsocket.close();
800 this.fsocket = null;
801 this.freadingThread = null;
802 } catch (IOException e) {
803
804 e.printStackTrace();
805 }
806 }
807 }
808
809
810 public interface TOnDisconnect {
811 public void dispatch(TConnection aConnection);
812 }
813
814
815 public TOnDisconnect onDisconnect = null;
816
817
818
819
820 public void setThrottle(int aThrottle) {
821 TByteBuffer Payload = new TByteBuffer();
822 Payload.prepare(aThrottle);
823 Payload.prepareApply();
824 Payload.qWrite(aThrottle);
825 writeCommand(TEventEntry.IC_SET_THROTTLE, Payload.getBuffer());
826 }
827
828
829
830
831
832 public void readCommandsNonBlocking() throws IOException {
833 if (this.finputStream.available() != 0) {
834 int Command = TEventEntry.IC_INVALID_COMMAND;
835
836
837 TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32
838 + TByteBuffer.SIZE_OF_INT32);
839 TByteBuffer Payload = new TByteBuffer();
840 TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32);
841 do {
842 try {
843 try {
844 Command = readCommand(FixedCommandPart, Payload, PayloadCheck);
845 if (Command != TEventEntry.IC_INVALID_COMMAND)
846 handleCommand(Command, Payload);
847 } catch (ThreadDeath ex) {
848 Command = TEventEntry.IC_END_OF_SESSION;
849 }
850 } catch (Exception ex) {
851 if (isConnected())
852 System.out.println("## Exception in ReadCommands loop: " + ex.getMessage());
853 }
854 } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected() && (this.finputStream.available() != 0));
855 }
856 }
857
858
859
860
861
862
863 public void readCommandsNonThreaded(int aTimeOut) throws SocketException {
864 this.fsocket.setSoTimeout(aTimeOut);
865 int Command = TEventEntry.IC_INVALID_COMMAND;
866
867
868 TByteBuffer FixedCommandPart = new TByteBuffer(MAGIC_BYTES.length + TByteBuffer.SIZE_OF_INT32
869 + TByteBuffer.SIZE_OF_INT32);
870 TByteBuffer Payload = new TByteBuffer();
871 TByteBuffer PayloadCheck = new TByteBuffer(TByteBuffer.SIZE_OF_INT32);
872 do {
873 try {
874 try {
875 Command = readCommand(FixedCommandPart, Payload, PayloadCheck);
876 if (Command != TEventEntry.IC_INVALID_COMMAND)
877 handleCommand(Command, Payload);
878 } catch (ThreadDeath ex) {
879 Command = TEventEntry.IC_END_OF_SESSION;
880 }
881 } catch (Exception ex) {
882 if (isConnected())
883 System.out.println("## Exception in ReadCommands loop: " + ex.getMessage());
884 }
885 } while ((Command != TEventEntry.IC_END_OF_SESSION) && isConnected());
886 }
887
888
889
890 public int getOwnerID() {
891 return this.fownerID;
892 }
893
894
895
896
897 public void setOwnerID(int aValue) {
898 if (this.fownerID != aValue) {
899 this.fownerID = aValue;
900 setOwner();
901 }
902 }
903
904
905 public String getOwnerName() {
906 return this.fownerName;
907 }
908
909
910
911
912 public void setOwnerName(String aValue) {
913 if (this.fownerName != aValue) {
914 this.fownerName = aValue;
915 setOwner();
916 }
917 }
918
919
920 public int getUniqueClientID() {
921 return this.funiqueClientID;
922 }
923
924
925 public int getClientHandle() {
926 return this.fclientHandle;
927 }
928
929
930
931
932
933
934 public TEventEntry subscribe(String aEventName) {
935 return subscribe(aEventName, true);
936 }
937
938
939
940
941
942
943 public TEventEntry subscribe(String aEventName, boolean aUseFederationPrefix) {
944 TEventEntry Event = findOrAddEventL(prefixFederation(aEventName, aUseFederationPrefix));
945 if (!Event.isSubscribed())
946 Event.subscribe();
947 return Event;
948 }
949
950
951
952
953
954 public TEventEntry publish(String aEventName) {
955 return publish(aEventName, true);
956 }
957
958
959
960
961
962
963 public TEventEntry publish(String aEventName, boolean aUseFederationPrefix) {
964 TEventEntry Event = findOrAddEventL(prefixFederation(aEventName, aUseFederationPrefix));
965 if (!Event.isPublished())
966 Event.publish();
967 return Event;
968 }
969
970
971
972
973 public void unSubscribe(String aEventName) {
974 unSubscribe(aEventName, true);
975 }
976
977
978
979
980
981 public void unSubscribe(String aEventName, boolean aUseFederationPrefix) {
982 TEventEntry Event = findEventL(prefixFederation(aEventName, aUseFederationPrefix));
983 if (Event != null && Event.isSubscribed())
984 Event.unSubscribe(true);
985 }
986
987
988
989
990 public void unPublish(String aEventName) {
991 unPublish(aEventName, true);
992 }
993
994
995
996
997
998 public void unPublish(String aEventName, boolean aUseFederationPrefix) {
999 TEventEntry Event = findEventL(prefixFederation(aEventName, aUseFederationPrefix));
1000 if (Event != null && Event.isPublished())
1001 Event.unPublish(true);
1002 }
1003
1004
1005
1006
1007
1008
1009
1010
1011 public int signalEvent(String aEventName, int aEventKind, TByteBuffer aEventPayload) {
1012 return signalEvent(aEventName, aEventKind, aEventPayload, true);
1013 }
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023 public int signalEvent(String aEventName, int aEventKind, TByteBuffer aEventPayload, boolean aUseFederationPrefix) {
1024 TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix));
1025 if (Event != null)
1026 return Event.signalEvent(aEventKind, aEventPayload.getBuffer());
1027 else
1028 return ICE_EVENT_NOT_PUBLISHED;
1029 }
1030
1031
1032
1033
1034
1035
1036
1037
1038 public int signalBuffer(String aEventName, int aBufferID, byte[] aBuffer) {
1039 return signalBuffer(aEventName, aBufferID, aBuffer, 0, true);
1040 }
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051 public int signalBuffer(String aEventName, int aBufferID, byte[] aBuffer, int aEventFlags,
1052 boolean aUseFederationPrefix) {
1053 TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix));
1054 if (Event != null)
1055 return Event.signalBuffer(aBufferID, aBuffer, aEventFlags);
1056 else
1057 return ICE_EVENT_NOT_PUBLISHED;
1058 }
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068 public int signalChangeObject(String aEventName, int aAction, int aObjectID, String aAttribute) {
1069 return signalChangeObject(aEventName, aAction, aObjectID, aAttribute, true);
1070 }
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081 public int signalChangeObject(String aEventName, int aAction, int aObjectID, String aAttribute,
1082 boolean aUseFederationPrefix) {
1083 TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix));
1084 if (Event != null)
1085 return Event.signalChangeObject(aAction, aObjectID, aAttribute);
1086 else
1087 return ICE_EVENT_NOT_PUBLISHED;
1088 }
1089
1090
1091
1092
1093
1094
1095
1096 public int signalStream(String aEventName, String aStreamName, InputStream aStream) {
1097 return signalStream(aEventName, aStreamName, aStream, true);
1098 }
1099
1100
1101
1102
1103
1104
1105
1106
1107 public int signalStream(String aEventName, String aStreamName, InputStream aStream, boolean aUseFederationPrefix) {
1108 TEventEntry Event = findEventAutoPublishL(prefixFederation(aEventName, aUseFederationPrefix));
1109 if (Event != null)
1110 return Event.signalStream(aStreamName, aStream);
1111 else
1112 return ICE_EVENT_NOT_PUBLISHED;
1113 }
1114
1115
1116
1117 public interface TOnVariable {
1118 public void dispatch(TConnection aConnection, String aVarName, byte[] aVarValue, byte[] aPrevValue);
1119 }
1120
1121
1122
1123 private TOnVariable onVariable = null;
1124
1125
1126
1127 public void setOnVariable(TOnVariable aValue)
1128 {
1129 this.onVariable = aValue;
1130 requestAllVariables();
1131 }
1132
1133
1134 protected void requestAllVariables() {
1135 writeCommand(TEventEntry.IC_ALL_VARIABLES, null);
1136 }
1137
1138
1139
1140
1141
1142 public void setVariableValue(String aVarName, String aVarValue) {
1143 TByteBuffer Payload = new TByteBuffer();
1144 Payload.prepare(aVarName);
1145 Payload.prepare(aVarValue);
1146 Payload.prepareApply();
1147 Payload.qWrite(aVarName);
1148 Payload.qWrite(aVarValue);
1149 writeCommand(TEventEntry.IC_SET_VARIABLE, Payload.getBuffer());
1150 }
1151
1152
1153
1154
1155
1156 public void setVariableValue(String aVarName, TByteBuffer aVarValue) {
1157 TByteBuffer Payload = new TByteBuffer();
1158 Payload.prepare(aVarName);
1159 Payload.prepare(aVarValue);
1160 Payload.prepareApply();
1161 Payload.qWrite(aVarName);
1162 Payload.qWrite(aVarValue);
1163 writeCommand(TEventEntry.IC_SET_VARIABLE, Payload.getBuffer());
1164 }
1165
1166
1167
1168
1169
1170
1171 public void setVariableValue(String aVarName, String aVarValue, TVarPrefix aVarPrefix) {
1172 TByteBuffer Payload = new TByteBuffer();
1173 Payload.prepare(aVarPrefix.ordinal());
1174 Payload.prepare(aVarName);
1175 Payload.prepare(aVarValue);
1176 Payload.prepareApply();
1177 Payload.qWrite(aVarPrefix.ordinal());
1178 Payload.qWrite(aVarName);
1179 Payload.qWrite(aVarValue);
1180 writeCommand(TEventEntry.IC_SET_VARIABLE_PREFIXED, Payload.getBuffer());
1181 }
1182
1183
1184
1185
1186
1187
1188 public void setVariableValue(String aVarName, TByteBuffer aVarValue, TVarPrefix aVarPrefix) {
1189 TByteBuffer Payload = new TByteBuffer();
1190 Payload.prepare(aVarPrefix.ordinal());
1191 Payload.prepare(aVarName);
1192 Payload.prepare(aVarValue);
1193 Payload.prepareApply();
1194 Payload.qWrite(aVarPrefix.ordinal());
1195 Payload.qWrite(aVarName);
1196 Payload.qWrite(aVarValue);
1197 writeCommand(TEventEntry.IC_SET_VARIABLE_PREFIXED, Payload.getBuffer());
1198 }
1199
1200
1201 public interface TOnStatusUpdate {
1202 public void dispatch(TConnection aConnection, String aModelUniqueClientID, String aModelName, int aProgress, int aStatus);
1203 }
1204
1205
1206
1207 private TOnStatusUpdate onStatusUpdate = null;
1208
1209
1210
1211 public void setOnStatusUpdate(TOnStatusUpdate aValue)
1212 {
1213 this.onStatusUpdate = aValue;
1214 requestAllVariables();
1215 }
1216
1217
1218
1219 public final static int STATUS_READY = 0;
1220
1221 public final static int STATUS_CALCULATING = 1;
1222
1223 public final static int STATUS_BUSY = 2;
1224
1225
1226
1227
1228
1229
1230 public void updateStatus(int aProgress, int aStatus) throws InterruptedException {
1231 TByteBuffer Payload = new TByteBuffer();
1232 Payload.prepare(aStatus);
1233 Payload.prepare(aProgress);
1234 Payload.prepareApply();
1235 Payload.qWrite(aStatus);
1236 Payload.qWrite(aProgress);
1237 if (this.imb2Compatible) {
1238
1239 if (this.funiqueClientID == 0) {
1240 int SpinCount = 10;
1241 while (this.funiqueClientID == 0 && SpinCount > 0) {
1242 Thread.sleep(500);
1243 SpinCount--;
1244 }
1245 }
1246
1247 setVariableValue(Integer.toHexString(this.funiqueClientID) + prefixFederation(this.fownerName).toUpperCase() + MODEL_STATUS_VAR_SEP_CHAR
1248 + MODEL_Status_VAR_NAME, Payload);
1249 } else
1250 setVariableValue(prefixFederation(this.fownerName).toUpperCase() + MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME, Payload,
1251 TVarPrefix.vpUniqueClientID);
1252 }
1253
1254
1255 public void removeStatus() {
1256 if (this.imb2Compatible)
1257
1258 setVariableValue(Integer.toHexString(this.funiqueClientID) + prefixFederation(this.fownerName) + MODEL_STATUS_VAR_SEP_CHAR
1259 + MODEL_Status_VAR_NAME, "");
1260 else
1261 setVariableValue(prefixFederation(this.fownerName) + MODEL_STATUS_VAR_SEP_CHAR + MODEL_Status_VAR_NAME, "",
1262 TVarPrefix.vpUniqueClientID);
1263 }
1264
1265
1266
1267
1268
1269 public void subscribeOnFocus(TEventEntry.TOnFocus aOnFocus) {
1270 if (this.ffocusEvent == null)
1271 this.ffocusEvent = subscribe(FOCUS_EVENT_NAME);
1272 this.ffocusEvent.onFocus = aOnFocus;
1273 }
1274
1275
1276
1277
1278
1279
1280 public int signalFocus(double aX, double aY) {
1281 if (this.ffocusEvent == null)
1282 this.ffocusEvent = findEventAutoPublishL(prefixFederation(FOCUS_EVENT_NAME));
1283 if (this.ffocusEvent != null) {
1284 TByteBuffer Payload = new TByteBuffer();
1285 Payload.prepare(aX);
1286 Payload.prepare(aY);
1287 Payload.prepareApply();
1288 Payload.qWrite(aX);
1289 Payload.qWrite(aY);
1290 return this.ffocusEvent.signalEvent(TEventEntry.EK_CHANGE_OBJECT_EVENT, Payload.getBuffer());
1291 } else
1292 return ICE_EVENT_NOT_PUBLISHED;
1293 }
1294
1295
1296
1297
1298
1299 public void subscribeOnFederationChange(TEventEntry.TOnChangeFederation aOnChangeFederation) {
1300 if (this.fchangeFederationEvent == null)
1301 this.fchangeFederationEvent = subscribe(CHANGE_FEDERATION_EVENT_NAME);
1302 this.fchangeFederationEvent.onChangeFederation = aOnChangeFederation;
1303 }
1304
1305
1306
1307
1308
1309
1310 public int signalChangeFederation(int aNewFederationID, String aNewFederation) {
1311 if (this.fchangeFederationEvent == null)
1312 this.fchangeFederationEvent = findEventAutoPublishL(prefixFederation(CHANGE_FEDERATION_EVENT_NAME));
1313 if (this.fchangeFederationEvent != null)
1314 return this.fchangeFederationEvent.signalChangeObject(TEventEntry.ACTION_CHANGE, aNewFederationID, aNewFederation);
1315 else
1316 return ICE_EVENT_NOT_PUBLISHED;
1317 }
1318
1319
1320
1321
1322
1323
1324
1325
1326 public int logWriteLn(String aLogEventName, String aLine, TEventEntry.TLogLevel aLevel) {
1327 if (this.flogEvent == null)
1328 this.flogEvent = findEventAutoPublishL(prefixFederation(aLogEventName));
1329 if (this.flogEvent != null)
1330 return this.flogEvent.logWriteLn(aLine, aLevel);
1331 else
1332 return ICE_EVENT_NOT_PUBLISHED;
1333 }
1334
1335
1336
1337
1338
1339 public interface TOnEventnames {
1340 public void dispatch(TConnection aConnection, TEventNameEntry[] aEventNames);
1341 }
1342
1343
1344 public TOnEventnames onEventNames = null;
1345
1346
1347 public interface TOnSubAndPub {
1348 public void dispatch(TConnection aConnection, int aCommand, String aEventName);
1349 }
1350
1351
1352 public TOnSubAndPub onSubAndPub = null;
1353
1354
1355
1356 public static final int EF_PUBLISHERS = 1;
1357
1358 public static final int EF_SUBSCRIBERS = 2;
1359
1360 public static final int EF_TIMERS = 4;
1361
1362
1363
1364
1365
1366
1367 public int requestEventname(String aEventNameFilter, int aEventFilters) {
1368 TByteBuffer Payload = new TByteBuffer();
1369 Payload.prepare(aEventNameFilter);
1370 Payload.prepare(aEventFilters);
1371 Payload.prepareApply();
1372 Payload.qWrite(aEventNameFilter);
1373 Payload.qWrite(aEventFilters);
1374 return writeCommand(TEventEntry.IC_REQUEST_EVENT_NAMES, Payload.getBuffer());
1375 }
1376
1377
1378 @Override
1379 public final String toString()
1380 {
1381 return "TConnection [remoteHost=" + this.fremoteHost + ", remotePort=" + this.fremotePort + ", federation="
1382 + this.ffederation + ", isConnected()=" + this.isConnected() + "]";
1383 }
1384 }