1 package org.opentrafficsim.remotecontrol;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.io.Serializable;
11 import java.net.URISyntaxException;
12 import java.nio.charset.StandardCharsets;
13 import java.rmi.RemoteException;
14 import java.util.Arrays;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.concurrent.atomic.AtomicInteger;
19
20 import javax.naming.NamingException;
21 import javax.swing.JFrame;
22 import javax.swing.JPanel;
23 import javax.swing.JScrollPane;
24 import javax.xml.bind.JAXBException;
25 import javax.xml.parsers.ParserConfigurationException;
26
27 import org.djunits.value.ValueRuntimeException;
28 import org.djunits.value.vdouble.scalar.Duration;
29 import org.djunits.value.vdouble.scalar.Length;
30 import org.djunits.value.vdouble.scalar.Time;
31 import org.djutils.cli.Checkable;
32 import org.djutils.cli.CliUtil;
33 import org.djutils.decoderdumper.HexDumper;
34 import org.djutils.event.EventInterface;
35 import org.djutils.event.EventListenerInterface;
36 import org.djutils.event.EventTypeInterface;
37 import org.djutils.immutablecollections.ImmutableMap;
38 import org.djutils.logger.CategoryLogger;
39 import org.djutils.logger.LogCategory;
40 import org.djutils.serialization.SerializationException;
41 import org.opentrafficsim.base.parameters.ParameterException;
42 import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
43 import org.opentrafficsim.core.dsol.AbstractOTSModel;
44 import org.opentrafficsim.core.dsol.OTSAnimator;
45 import org.opentrafficsim.core.dsol.OTSModelInterface;
46 import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
47 import org.opentrafficsim.core.geometry.DirectedPoint;
48 import org.opentrafficsim.core.geometry.OTSGeometryException;
49 import org.opentrafficsim.core.gtu.GTU;
50 import org.opentrafficsim.core.gtu.GTUException;
51 import org.opentrafficsim.core.gtu.GTUType;
52 import org.opentrafficsim.core.network.Network;
53 import org.opentrafficsim.core.network.NetworkException;
54 import org.opentrafficsim.core.network.OTSNetwork;
55 import org.opentrafficsim.core.object.InvisibleObjectInterface;
56 import org.opentrafficsim.draw.core.OTSDrawingException;
57 import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
58 import org.opentrafficsim.road.network.OTSRoadNetwork;
59 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
60 import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
61 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
62 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
63 import org.opentrafficsim.swing.gui.OTSAnimationPanel;
64 import org.opentrafficsim.swing.gui.OTSSimulationApplication;
65 import org.opentrafficsim.swing.gui.OTSSwingApplication;
66 import org.opentrafficsim.trafficcontrol.TrafficControlException;
67 import org.opentrafficsim.trafficcontrol.TrafficController;
68 import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
69 import org.pmw.tinylog.Level;
70 import org.sim0mq.Sim0MQException;
71 import org.sim0mq.message.Sim0MQMessage;
72 import org.xml.sax.SAXException;
73 import org.zeromq.SocketType;
74 import org.zeromq.ZContext;
75 import org.zeromq.ZMQ;
76
77 import nl.tudelft.simulation.dsol.SimRuntimeException;
78 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
79 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeAnimator;
80 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
81 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
82 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
83 import nl.tudelft.simulation.jstats.streams.StreamInterface;
84 import picocli.CommandLine.Command;
85 import picocli.CommandLine.Option;
86
87
88
89
90
91
92
93
94
95
96
97
98 public class Sim0MQControlledOTS implements EventListenerInterface
99 {
100
101 private static final long serialVersionUID = 20200317L;
102
103
104 private Sim0MQOTSModel model = null;
105
106
107 private final ZContext zContext;
108
109
110 private final int port;
111
112
113 private final MasterCommunication masterCommunication = new MasterCommunication();
114
115
116
117
118
119
120 public Sim0MQControlledOTS(final ZContext zContext, final int port)
121 {
122 this.zContext = zContext;
123 this.port = port;
124 this.masterCommunication.start();
125 }
126
127
128
129
130 class MasterCommunication extends Thread
131 {
132 @Override
133 public void run()
134 {
135 System.err.println("MasterCommunication thread id is " + Thread.currentThread().getId());
136 ZMQ.Socket remoteControllerSocket = Sim0MQControlledOTS.this.zContext.createSocket(SocketType.PAIR);
137 remoteControllerSocket.setHWM(100000);
138 remoteControllerSocket.bind("tcp://*:" + Sim0MQControlledOTS.this.port);
139 ZMQ.Socket resultQueue = Sim0MQControlledOTS.this.zContext.createSocket(SocketType.PULL);
140 resultQueue.bind("inproc://results");
141 ZMQ.Socket toCommandLoop = Sim0MQControlledOTS.this.zContext.createSocket(SocketType.PUSH);
142 toCommandLoop.setHWM(1000);
143 toCommandLoop.connect("inproc://commands");
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 ZMQ.Poller poller = Sim0MQControlledOTS.this.zContext.createPoller(2);
175 poller.register(remoteControllerSocket, ZMQ.Poller.POLLIN);
176 poller.register(resultQueue, ZMQ.Poller.POLLIN);
177 while (!Thread.currentThread().isInterrupted())
178 {
179 poller.poll();
180 if (poller.pollin(0))
181 {
182 System.err.println("Got incoming command");
183 byte[] data = remoteControllerSocket.recv();
184 toCommandLoop.send(data, 0);
185 System.err.println("Incoming command handed over to toCommandLoop socket");
186 }
187 else if (poller.pollin(1))
188 {
189 System.err.println("Got outgoing result");
190 byte[] data = resultQueue.recv();
191 remoteControllerSocket.send(data, 0);
192 System.err.println("Outgoing result handed over to remoteControllerSocket");
193 }
194 }
195
196 }
197 }
198
199
200
201
202 @Command(description = "Sim0MQ Remotely Controlled OTS", name = "Sim0MQOTS", mixinStandardHelpOptions = true,
203 version = "1.0")
204 public static class Options implements Checkable
205 {
206
207 @Option(names = { "-p", "--port" }, description = "Internet port to use", defaultValue = "8888")
208 private int port;
209
210
211
212
213
214 public final int getPort()
215 {
216 return this.port;
217 }
218
219 @Override
220 public final void check() throws Exception
221 {
222 if (this.port <= 0 || this.port > 65535)
223 {
224 throw new Exception("Port should be between 1 and 65535");
225 }
226 }
227 }
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242 public static void main(final String[] args) throws NetworkException, OTSGeometryException, NamingException,
243 ValueRuntimeException, ParameterException, SimRuntimeException, Sim0MQException, SerializationException, IOException
244 {
245 CategoryLogger.setAllLogLevel(Level.WARNING);
246 CategoryLogger.setLogCategories(LogCategory.ALL);
247 Options options = new Options();
248 CliUtil.execute(options, args);
249 int port = options.getPort();
250 System.out.println("Creating OTS server listening on port " + port);
251 ZContext context = new ZContext(10);
252 Sim0MQControlledOTS slave = new Sim0MQControlledOTS(context, port);
253
254 slave.commandLoop();
255
256 context.destroy();
257 context.close();
258 }
259
260
261
262
263
264
265
266
267
268 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
269 {
270 if (null != this.model)
271 {
272 return "Cannot create another network (yet)";
273 }
274 else
275 {
276 try
277 {
278 OTSAnimator animator = new OTSAnimator("OTS Animator");
279 this.model = new Sim0MQOTSModel(animator, "OTS model", "Remotely controlled OTS model", xml);
280 Map<String, StreamInterface> map = new LinkedHashMap<>();
281 map.put("generation", new MersenneTwister(seed));
282 animator.initialize(Time.ZERO, simulationDuration, warmupTime, this.model, map);
283 this.model.getNetwork().addListener(this, Network.GTU_ADD_EVENT);
284 this.model.getNetwork().addListener(this, Network.GTU_REMOVE_EVENT);
285 OTSAnimationPanel animationPanel =
286 new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000), animator,
287 this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
288 DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
289 new Sim0MQRemoteControlSwingApplication(this.model, animationPanel);
290 JFrame frame = (JFrame) animationPanel.getParent().getParent().getParent();
291 frame.setExtendedState(Frame.NORMAL);
292 frame.setSize(new Dimension(1100, 1000));
293 frame.setBounds(0, 25, 1100, 1000);
294 animator.setSpeedFactor(Double.MAX_VALUE, true);
295 animator.setSpeedFactor(1000.0, true);
296
297 ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap =
298 this.model.getNetwork().getInvisibleObjectMap();
299 animator.addListener(this, DEVSRealTimeAnimator.CHANGE_SPEED_FACTOR_EVENT);
300 animator.addListener(this, SimulatorInterface.TIME_CHANGED_EVENT);
301 for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
302 {
303 if (ioi instanceof TrafCOD)
304 {
305 TrafCOD trafCOD = (TrafCOD) ioi;
306 Container controllerDisplayPanel = trafCOD.getDisplayContainer();
307 if (null != controllerDisplayPanel)
308 {
309 JPanel wrapper = new JPanel(new BorderLayout());
310 wrapper.add(new JScrollPane(controllerDisplayPanel));
311 TabbedContentPane tabbedPane = animationPanel.getTabbedPane();
312 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
313 }
314
315
316 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_CONTROLLER_WARNING);
317 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_CONFLICT_GROUP_CHANGED);
318 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_STATE_CHANGED);
319 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_VARIABLE_CREATED);
320 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_TRACED_VARIABLE_UPDATED);
321 }
322 }
323 try
324 {
325 Thread.sleep(300);
326 }
327 catch (InterruptedException e)
328 {
329 e.printStackTrace();
330 }
331 animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
332 }
333 catch (Exception e)
334 {
335 return e.getMessage();
336 }
337 }
338 return null;
339 }
340
341
342 private AtomicInteger packetsSent = new AtomicInteger(0);
343
344
345
346
347 @SuppressWarnings("checkstyle:methodlength")
348 public void commandLoop()
349 {
350 System.err.println("CommandLoop thread id is " + Thread.currentThread().getId());
351 ZMQ.Socket incomingCommands = this.zContext.createSocket(SocketType.PULL);
352 incomingCommands.bind("inproc://commands");
353 while (!Thread.currentThread().isInterrupted())
354 {
355
356 System.err.println("CommandLoop ready to read a command");
357 byte[] request = incomingCommands.recv(0);
358 System.err.println("CommandLoop processing a command of " + request.length + " bytes");
359 Object[] message;
360 String result = "At your command";
361 try
362 {
363 message = Sim0MQMessage.decode(request).createObjectArray();
364 System.out.println("Received Sim0MQ message:");
365
366 if (message.length >= 8 && message[5] instanceof String)
367 {
368 String command = (String) message[5];
369 System.out.println("Command is " + command);
370 switch (command)
371 {
372 case "LOADNETWORK":
373 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
374 && message[10] instanceof Duration && message[11] instanceof Long)
375 {
376 System.out.println("xml length = " + ((String) message[8]).length());
377 String loadResult = loadNetwork((String) message[8], (Duration) message[9],
378 (Duration) message[10], (Long) message[11]);
379 if (null != loadResult)
380 {
381 result = loadResult;
382 }
383 }
384 else
385 {
386 result = "no network, warmupTime and/or runTime provided with LOADNETWORK command";
387 }
388 break;
389
390 case "SIMULATEUNTIL":
391 if (null == this.model)
392 {
393 result = "No model loaded";
394 }
395 else if (message.length == 9 && message[8] instanceof Time)
396 {
397 OTSSimulatorInterface simulator = this.model.getSimulator();
398 System.out.println("Simulating up to " + message[8]);
399 simulator.runUpTo((Time) message[8]);
400 int count = 0;
401 while (simulator.isStartingOrRunning())
402 {
403 System.out.print(".");
404 count++;
405 if (count > 1000)
406 {
407 System.out.println("SIMULATOR DOES NOT STOP. TIME = " + simulator.getSimulatorTime());
408 Iterator<SimEventInterface<Duration>> elIt =
409 simulator.getEventList().iterator();
410 while (elIt.hasNext())
411 {
412 System.out.println("EVENTLIST: " + elIt.next());
413 }
414 simulator.stop();
415 }
416 try
417 {
418 Thread.sleep(10);
419 }
420 catch (InterruptedException e)
421 {
422 e.printStackTrace();
423 }
424 }
425 System.out.println("Simulator has stopped at time " + simulator.getSimulatorTime());
426 try
427 {
428 Thread.sleep(100);
429 }
430 catch (InterruptedException e)
431 {
432 e.printStackTrace();
433 }
434 }
435 else
436 {
437 result = "Bad or missing stop time";
438 }
439 break;
440
441 case "SENDALLGTUPOSITIONS":
442 if (null == this.model)
443 {
444 result = "No model loaded";
445 }
446 else if (message.length == 8)
447 {
448 for (GTU gtu : this.model.network.getGTUs())
449 {
450
451 try
452 {
453 DirectedPoint gtuPosition = gtu.getLocation();
454 Object[] gtuData = new Object[] { gtu.getId(), gtu.getGTUType().getId(), gtuPosition.x,
455 gtuPosition.y, gtuPosition.z, gtuPosition.getRotZ(), gtu.getSpeed(),
456 gtu.getAcceleration() };
457 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave_XXXXX", "master", "GTUPOSITION",
458 0, gtuData));
459 }
460 catch (Sim0MQException | SerializationException e)
461 {
462 e.printStackTrace();
463 break;
464 }
465
466 }
467 }
468 break;
469
470 default:
471
472 System.out.println("Don't know how to handle message:");
473 System.out.println(Sim0MQMessage.print(message));
474 result = "Unimplemented command " + command;
475 break;
476 }
477 }
478 else
479 {
480 System.out.println("Don't know how to handle message:");
481 System.out.println(HexDumper.hexDumper(request));
482 result = "Ignored message";
483 }
484 }
485 catch (Sim0MQException | SerializationException e)
486 {
487 e.printStackTrace();
488 result = "Could not decode command: " + e.getMessage();
489 }
490
491 try
492 {
493 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave_XXXXX", "master", "READY", 0, result));
494 }
495 catch (Sim0MQException | SerializationException e)
496 {
497 e.printStackTrace();
498 break;
499 }
500 }
501 }
502
503
504 private Map<Long, ZMQ.Socket> socketMap = new LinkedHashMap<>();
505
506
507
508
509
510 public synchronized void sendToMaster(final byte[] data)
511 {
512 byte[] fixedData = data;
513 int number = -1;
514 try
515 {
516
517 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
518 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
519 number = this.packetsSent.addAndGet(1);
520 fixedData =
521 Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
522 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
523 System.err.println("Prepared message " + number + ", type is " + messageFields[5]);
524 }
525 catch (Sim0MQException | SerializationException e)
526 {
527 e.printStackTrace();
528 }
529 Long threadId = Thread.currentThread().getId();
530 ZMQ.Socket socket = this.socketMap.get(threadId);
531 while (null == socket)
532 {
533 System.out.println("Creating new internal socket for thread " + threadId);
534 try
535 {
536 socket = this.zContext.createSocket(SocketType.PUSH);
537 socket.setHWM(100000);
538 socket.connect("inproc://results");
539 this.socketMap.put(threadId, socket);
540
541 }
542 catch (Exception cbie)
543 {
544 System.err.println("Caught funny exception - probably related to DSOL animator start/stop code ... retrying");
545 try
546 {
547 Thread.sleep(100);
548 }
549 catch (InterruptedException e)
550 {
551 System.err.println("Sleep interrupted!");
552 }
553 }
554 }
555 System.out.println("pre send");
556
557
558
559
560 socket.send(fixedData, 0);
561
562
563 }
564
565
566 @Override
567 public void notify(final EventInterface event) throws RemoteException
568 {
569 try
570 {
571 EventTypeInterface type = event.getType();
572 String eventTypeName = type.getName();
573 System.out.println("notify: start processing event " + eventTypeName);
574 switch (eventTypeName)
575 {
576 case "TRAFFICCONTROL.CONTROLLER_EVALUATING":
577 {
578 Object[] payload = (Object[]) event.getContent();
579 CategoryLogger.always().info("{}: Evaluating at time {}", payload[0], payload[1]);
580 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0,
581 String.format("%s: Evaluating at time %s", payload[0], payload[1])));
582 break;
583 }
584
585 case "TRAFFICCONTROL.CONFLICT_GROUP_CHANGED":
586 {
587 Object[] payload = (Object[]) event.getContent();
588 CategoryLogger.always().info("{}: Conflict group changed from {} to {}", payload[0], payload[1],
589 payload[2]);
590 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
591 break;
592 }
593
594 case "TRAFFICCONTROL.VARIABLE_UPDATED":
595 {
596 Object[] payload = (Object[]) event.getContent();
597 CategoryLogger.always().info("{}: Variable changed {} <- {} {}", payload[0], payload[1], payload[4],
598 payload[5]);
599 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
600 break;
601 }
602
603 case "TRAFFICCONTROL.CONTROLLER_WARNING":
604 {
605 Object[] payload = (Object[]) event.getContent();
606 CategoryLogger.always().info("{}: Warning {}", payload[0], payload[1]);
607 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
608 break;
609 }
610
611 case "TIME_CHANGED_EVENT":
612 {
613 CategoryLogger.always().info("Time changed to {}", event.getContent());
614 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0,
615 String.format("Time changed to %s", event.getContent())));
616 break;
617 }
618
619 case "NETWORK.GTU.ADD":
620 {
621 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, event.getContent()));
622 break;
623 }
624
625 case "NETWORK.GTU.REMOVE":
626 {
627 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, event.getContent()));
628 break;
629 }
630
631 default:
632 {
633 CategoryLogger.always().info("Event of unhandled type {} with payload {}", event.getType(),
634 event.getContent());
635 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", "Event of unhandled type", 0, String
636 .format("%s: Event of unhandled type %s with payload {}", event.getType(), event.getContent())));
637 break;
638 }
639 }
640 System.out.println("notify: finished processing event " + eventTypeName);
641 }
642 catch (Sim0MQException | SerializationException e)
643 {
644 e.printStackTrace();
645 }
646 }
647
648
649
650
651 class Sim0MQRemoteControlSwingApplication extends OTSSimulationApplication<OTSModelInterface>
652 {
653
654 private static final long serialVersionUID = 1L;
655
656
657
658
659
660
661 Sim0MQRemoteControlSwingApplication(final OTSModelInterface model, final OTSAnimationPanel panel)
662 throws OTSDrawingException
663 {
664 super(model, panel);
665 }
666 }
667
668
669
670
671 class Sim0MQOTSModel extends AbstractOTSModel implements EventListenerInterface
672 {
673
674 private static final long serialVersionUID = 20170419L;
675
676
677 @SuppressWarnings("checkstyle:visibilitymodifier")
678 OTSRoadNetwork network;
679
680
681 private final String xml;
682
683
684
685
686
687
688
689 Sim0MQOTSModel(final OTSSimulatorInterface simulator, final String shortName, final String description,
690 final String xml)
691 {
692 super(simulator, shortName, description);
693 this.xml = xml;
694 }
695
696
697 @Override
698 public void notify(final EventInterface event) throws RemoteException
699 {
700 System.err.println("Received event " + event);
701 }
702
703
704 @Override
705 public void constructModel() throws SimRuntimeException
706 {
707 this.network = new OTSRoadNetwork(getShortName(), true, getSimulator());
708 try
709 {
710 XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
711 false);
712 LaneCombinationList ignoreList = new LaneCombinationList();
713 LaneCombinationList permittedList = new LaneCombinationList();
714 ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
715 getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)), ignoreList,
716 permittedList);
717 }
718 catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException
719 | SAXException | ParserConfigurationException | GTUException | IOException
720 | TrafficControlException exception)
721 {
722 exception.printStackTrace();
723
724
725 throw new SimRuntimeException(exception);
726 }
727 }
728
729
730 @Override
731 public OTSNetwork getNetwork()
732 {
733 return this.network;
734 }
735
736
737 @Override
738 public Serializable getSourceId()
739 {
740 return "Sim0MQOTSModel";
741 }
742
743 }
744
745 }