1 package org.opentrafficsim.remotecontrol;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.io.Serializable;
11 import java.net.URISyntaxException;
12 import java.nio.charset.StandardCharsets;
13 import java.rmi.RemoteException;
14 import java.util.Arrays;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.concurrent.atomic.AtomicInteger;
19
20 import javax.naming.NamingException;
21 import javax.swing.JFrame;
22 import javax.swing.JPanel;
23 import javax.swing.JScrollPane;
24 import javax.xml.bind.JAXBException;
25 import javax.xml.parsers.ParserConfigurationException;
26
27 import org.djunits.value.ValueRuntimeException;
28 import org.djunits.value.vdouble.scalar.Duration;
29 import org.djunits.value.vdouble.scalar.Length;
30 import org.djunits.value.vdouble.scalar.Time;
31 import org.djutils.cli.Checkable;
32 import org.djutils.cli.CliUtil;
33 import org.djutils.decoderdumper.HexDumper;
34 import org.djutils.event.EventInterface;
35 import org.djutils.event.EventListenerInterface;
36 import org.djutils.event.EventTypeInterface;
37 import org.djutils.immutablecollections.ImmutableMap;
38 import org.djutils.logger.CategoryLogger;
39 import org.djutils.logger.LogCategory;
40 import org.djutils.serialization.SerializationException;
41 import org.opentrafficsim.base.parameters.ParameterException;
42 import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
43 import org.opentrafficsim.core.dsol.AbstractOTSModel;
44 import org.opentrafficsim.core.dsol.OTSAnimator;
45 import org.opentrafficsim.core.dsol.OTSModelInterface;
46 import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
47 import org.opentrafficsim.core.geometry.OTSGeometryException;
48 import org.opentrafficsim.core.gtu.GTU;
49 import org.opentrafficsim.core.gtu.GTUException;
50 import org.opentrafficsim.core.gtu.GTUType;
51 import org.opentrafficsim.core.network.Network;
52 import org.opentrafficsim.core.network.NetworkException;
53 import org.opentrafficsim.core.network.OTSNetwork;
54 import org.opentrafficsim.core.object.InvisibleObjectInterface;
55 import org.opentrafficsim.draw.core.OTSDrawingException;
56 import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
57 import org.opentrafficsim.road.network.OTSRoadNetwork;
58 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
59 import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
60 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
61 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
62 import org.opentrafficsim.swing.gui.OTSAnimationPanel;
63 import org.opentrafficsim.swing.gui.OTSSimulationApplication;
64 import org.opentrafficsim.swing.gui.OTSSwingApplication;
65 import org.opentrafficsim.trafficcontrol.TrafficControlException;
66 import org.opentrafficsim.trafficcontrol.TrafficController;
67 import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
68 import org.pmw.tinylog.Level;
69 import org.sim0mq.Sim0MQException;
70 import org.sim0mq.message.Sim0MQMessage;
71 import org.xml.sax.SAXException;
72 import org.zeromq.SocketType;
73 import org.zeromq.ZContext;
74 import org.zeromq.ZMQ;
75
76 import nl.tudelft.simulation.dsol.SimRuntimeException;
77 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
78 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
79 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
80 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
81 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
82 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
83 import nl.tudelft.simulation.jstats.streams.StreamInterface;
84 import nl.tudelft.simulation.language.d3.DirectedPoint;
85 import picocli.CommandLine.Command;
86 import picocli.CommandLine.Option;
87
88
89
90
91
92
93
94
95
96
97
98
99 public class Sim0MQControlledOTS implements EventListenerInterface
100 {
101
102 private static final long serialVersionUID = 20200317L;
103
104
105 private Sim0MQOTSModel model = null;
106
107
108 private final ZContext zContext;
109
110
111 private final int port;
112
113
114 private final MasterCommunication masterCommunication = new MasterCommunication();
115
116
117
118
119
120
121 public Sim0MQControlledOTS(final ZContext zContext, final int port)
122 {
123 this.zContext = zContext;
124 this.port = port;
125 this.masterCommunication.start();
126 }
127
128
129
130
131 class MasterCommunication extends Thread
132 {
133 @Override
134 public void run()
135 {
136 System.err.println("MasterCommunication thread id is " + Thread.currentThread().getId());
137 ZMQ.Socket remoteControllerSocket = zContext.createSocket(SocketType.PAIR);
138 remoteControllerSocket.setHWM(100000);
139 remoteControllerSocket.bind("tcp://*:" + port);
140 ZMQ.Socket resultQueue = zContext.createSocket(SocketType.PULL);
141 resultQueue.bind("inproc://results");
142 ZMQ.Socket toCommandLoop = zContext.createSocket(SocketType.PUSH);
143 toCommandLoop.setHWM(1000);
144 toCommandLoop.connect("inproc://commands");
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 ZMQ.Poller poller = zContext.createPoller(2);
176 poller.register(remoteControllerSocket, ZMQ.Poller.POLLIN);
177 poller.register(resultQueue, ZMQ.Poller.POLLIN);
178 while (!Thread.currentThread().isInterrupted())
179 {
180 poller.poll();
181 if (poller.pollin(0))
182 {
183 System.err.println("Got incoming command");
184 byte[] data = remoteControllerSocket.recv();
185 toCommandLoop.send(data, 0);
186 System.err.println("Incoming command handed over to toCommandLoop socket");
187 }
188 else if (poller.pollin(1))
189 {
190 System.err.println("Got outgoing result");
191 byte[] data = resultQueue.recv();
192 remoteControllerSocket.send(data, 0);
193 System.err.println("Outgoing result handed over to remoteControllerSocket");
194 }
195 }
196
197 }
198 }
199
200
201
202
203 @Command(description = "Sim0MQ Remotely Controlled OTS", name = "Sim0MQOTS", mixinStandardHelpOptions = true,
204 version = "1.0")
205 public static class Options implements Checkable
206 {
207
208 @Option(names = { "-p", "--port" }, description = "Internet port to use", defaultValue = "8888")
209 private int port;
210
211
212
213
214
215 public final int getPort()
216 {
217 return this.port;
218 }
219
220 @Override
221 public final void check() throws Exception
222 {
223 if (this.port <= 0 || this.port > 65535)
224 {
225 throw new Exception("Port should be between 1 and 65535");
226 }
227 }
228 }
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243 public static void main(final String[] args) throws NetworkException, OTSGeometryException, NamingException,
244 ValueRuntimeException, ParameterException, SimRuntimeException, Sim0MQException, SerializationException, IOException
245 {
246 CategoryLogger.setAllLogLevel(Level.WARNING);
247 CategoryLogger.setLogCategories(LogCategory.ALL);
248 Options options = new Options();
249 CliUtil.execute(options, args);
250 int port = options.getPort();
251 System.out.println("Creating OTS server listening on port " + port);
252 ZContext context = new ZContext(10);
253 Sim0MQControlledOTSrolledOTS.html#Sim0MQControlledOTS">Sim0MQControlledOTS slave = new Sim0MQControlledOTS(context, port);
254
255 slave.commandLoop();
256
257 context.destroy();
258 context.close();
259 }
260
261
262
263
264
265
266
267
268
269 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
270 {
271 if (null != this.model)
272 {
273 return "Cannot create another network (yet)";
274 }
275 else
276 {
277 try
278 {
279 OTSAnimator animator = new OTSAnimator("OTS Animator");
280 this.model = new Sim0MQOTSModel(animator, "OTS model", "Remotely controlled OTS model", xml);
281 Map<String, StreamInterface> map = new LinkedHashMap<>();
282 map.put("generation", new MersenneTwister(seed));
283 animator.initialize(Time.ZERO, simulationDuration, warmupTime, this.model, map);
284 this.model.getNetwork().addListener(this, Network.GTU_ADD_EVENT);
285 this.model.getNetwork().addListener(this, Network.GTU_REMOVE_EVENT);
286 OTSAnimationPanel animationPanel =
287 new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000), animator,
288 this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
289 DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
290 new Sim0MQRemoteControlSwingApplication(this.model, animationPanel);
291 JFrame frame = (JFrame) animationPanel.getParent().getParent().getParent();
292 frame.setExtendedState(Frame.NORMAL);
293 frame.setSize(new Dimension(1100, 1000));
294 frame.setBounds(0, 25, 1100, 1000);
295 animator.setSpeedFactor(Double.MAX_VALUE, true);
296 animator.setSpeedFactor(1000.0, true);
297
298 ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap =
299 this.model.getNetwork().getInvisibleObjectMap();
300 animator.addListener(this, DEVSRealTimeClock.CHANGE_SPEED_FACTOR_EVENT);
301 animator.addListener(this, SimulatorInterface.TIME_CHANGED_EVENT);
302 for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
303 {
304 if (ioi instanceof TrafCOD)
305 {
306 TrafCOD trafCOD = (TrafCOD) ioi;
307 Container controllerDisplayPanel = trafCOD.getDisplayContainer();
308 if (null != controllerDisplayPanel)
309 {
310 JPanel wrapper = new JPanel(new BorderLayout());
311 wrapper.add(new JScrollPane(controllerDisplayPanel));
312 TabbedContentPane tabbedPane = animationPanel.getTabbedPane();
313 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
314 }
315
316
317 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_CONTROLLER_WARNING);
318 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_CONFLICT_GROUP_CHANGED);
319 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_STATE_CHANGED);
320 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_VARIABLE_CREATED);
321 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_TRACED_VARIABLE_UPDATED);
322 }
323 }
324 try
325 {
326 Thread.sleep(300);
327 }
328 catch (InterruptedException e)
329 {
330 e.printStackTrace();
331 }
332 animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
333 }
334 catch (Exception e)
335 {
336 return e.getMessage();
337 }
338 }
339 return null;
340 }
341
342
343 private AtomicInteger packetsSent = new AtomicInteger(0);
344
345
346
347
348 @SuppressWarnings("checkstyle:methodlength")
349 public void commandLoop()
350 {
351 System.err.println("CommandLoop thread id is " + Thread.currentThread().getId());
352 ZMQ.Socket incomingCommands = this.zContext.createSocket(SocketType.PULL);
353 incomingCommands.bind("inproc://commands");
354 while (!Thread.currentThread().isInterrupted())
355 {
356
357 System.err.println("CommandLoop ready to read a command");
358 byte[] request = incomingCommands.recv(0);
359 System.err.println("CommandLoop processing a command of " + request.length + " bytes");
360 Object[] message;
361 String result = "At your command";
362 try
363 {
364 message = Sim0MQMessage.decode(request).createObjectArray();
365 System.out.println("Received Sim0MQ message:");
366
367 if (message.length >= 8 && message[5] instanceof String)
368 {
369 String command = (String) message[5];
370 System.out.println("Command is " + command);
371 switch (command)
372 {
373 case "LOADNETWORK":
374 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
375 && message[10] instanceof Duration && message[11] instanceof Long)
376 {
377 System.out.println("xml length = " + ((String) message[8]).length());
378 String loadResult = loadNetwork((String) message[8], (Duration) message[9],
379 (Duration) message[10], (Long) message[11]);
380 if (null != loadResult)
381 {
382 result = loadResult;
383 }
384 }
385 else
386 {
387 result = "no network, warmupTime and/or runTime provided with LOADNETWORK command";
388 }
389 break;
390
391 case "SIMULATEUNTIL":
392 if (null == this.model)
393 {
394 result = "No model loaded";
395 }
396 else if (message.length == 9 && message[8] instanceof Time)
397 {
398 OTSSimulatorInterface simulator = this.model.getSimulator();
399 System.out.println("Simulating up to " + message[8]);
400 simulator.runUpTo((Time) message[8]);
401 int count = 0;
402 while (simulator.isStartingOrRunning())
403 {
404 System.out.print(".");
405 count++;
406 if (count > 1000)
407 {
408 System.out.println("SIMULATOR DOES NOT STOP. TIME = " + simulator.getSimulatorTime());
409 Iterator<SimEventInterface<SimTimeDoubleUnit>> elIt =
410 simulator.getEventList().iterator();
411 while (elIt.hasNext())
412 {
413 System.out.println("EVENTLIST: " + elIt.next());
414 }
415 simulator.stop();
416 }
417 try
418 {
419 Thread.sleep(10);
420 }
421 catch (InterruptedException e)
422 {
423 e.printStackTrace();
424 }
425 }
426 System.out.println("Simulator has stopped at time " + simulator.getSimulatorTime());
427 try
428 {
429 Thread.sleep(100);
430 }
431 catch (InterruptedException e)
432 {
433 e.printStackTrace();
434 }
435 }
436 else
437 {
438 result = "Bad or missing stop time";
439 }
440 break;
441
442 case "SENDALLGTUPOSITIONS":
443 if (null == this.model)
444 {
445 result = "No model loaded";
446 }
447 else if (message.length == 8)
448 {
449 for (GTU gtu : this.model.network.getGTUs())
450 {
451
452 try
453 {
454 DirectedPoint gtuPosition = gtu.getLocation();
455 Object[] gtuData = new Object[] { gtu.getId(), gtu.getGTUType().getId(), gtuPosition.x,
456 gtuPosition.y, gtuPosition.z, gtuPosition.getRotZ(), gtu.getSpeed(),
457 gtu.getAcceleration() };
458 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave_XXXXX", "master", "GTUPOSITION",
459 0, gtuData));
460 }
461 catch (Sim0MQException | SerializationException e)
462 {
463 e.printStackTrace();
464 break;
465 }
466
467 }
468 }
469 break;
470
471 default:
472 System.out.println("Don't know how to handle message:");
473 System.out.println(Sim0MQMessage.print(message));
474 result = "Unimplemented command " + command;
475 break;
476 }
477 }
478 else
479 {
480 System.out.println("Don't know how to handle message:");
481 System.out.println(HexDumper.hexDumper(request));
482 result = "Ignored message";
483 }
484 }
485 catch (Sim0MQException | SerializationException e)
486 {
487 e.printStackTrace();
488 result = "Could not decode command: " + e.getMessage();
489 }
490 catch (RemoteException e)
491 {
492 e.printStackTrace();
493 result = "Caught RemoteException: " + e.getMessage();
494 }
495
496 try
497 {
498 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave_XXXXX", "master", "READY", 0, result));
499 }
500 catch (Sim0MQException | SerializationException e)
501 {
502 e.printStackTrace();
503 break;
504 }
505 }
506 }
507
508
509 private Map<Long, ZMQ.Socket> socketMap = new LinkedHashMap<>();
510
511
512
513
514
515 public synchronized void sendToMaster(final byte[] data)
516 {
517 byte[] fixedData = data;
518 int number = -1;
519 try
520 {
521
522 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
523 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
524 number = this.packetsSent.addAndGet(1);
525 fixedData =
526 Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
527 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
528 System.err.println("Prepared message " + number + ", type is " + messageFields[5]);
529 }
530 catch (Sim0MQException | SerializationException e)
531 {
532 e.printStackTrace();
533 }
534 Long threadId = Thread.currentThread().getId();
535 ZMQ.Socket socket = this.socketMap.get(threadId);
536 while (null == socket)
537 {
538 System.out.println("Creating new internal socket for thread " + threadId);
539 try
540 {
541 socket = this.zContext.createSocket(SocketType.PUSH);
542 socket.setHWM(100000);
543 socket.connect("inproc://results");
544 this.socketMap.put(threadId, socket);
545
546 }
547 catch (Exception cbie)
548 {
549 System.err.println("Caught funny exception - probably related to DSOL animator start/stop code ... retrying");
550 try
551 {
552 Thread.sleep(100);
553 }
554 catch (InterruptedException e)
555 {
556 System.err.println("Sleep interrupted!");
557 }
558 }
559 }
560 System.out.println("pre send");
561
562
563
564
565 socket.send(fixedData, 0);
566
567
568 }
569
570
571 @Override
572 public void notify(final EventInterface event) throws RemoteException
573 {
574 try
575 {
576 EventTypeInterface type = event.getType();
577 String eventTypeName = type.getName();
578 System.out.println("notify: start processing event " + eventTypeName);
579 switch (eventTypeName)
580 {
581 case "TRAFFICCONTROL.CONTROLLER_EVALUATING":
582 {
583 Object[] payload = (Object[]) event.getContent();
584 CategoryLogger.always().info("{}: Evaluating at time {}", payload[0], payload[1]);
585 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0,
586 String.format("%s: Evaluating at time %s", payload[0], payload[1])));
587 break;
588 }
589
590 case "TRAFFICCONTROL.CONFLICT_GROUP_CHANGED":
591 {
592 Object[] payload = (Object[]) event.getContent();
593 CategoryLogger.always().info("{}: Conflict group changed from {} to {}", payload[0], payload[1],
594 payload[2]);
595 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
596 break;
597 }
598
599 case "TRAFFICCONTROL.VARIABLE_UPDATED":
600 {
601 Object[] payload = (Object[]) event.getContent();
602 CategoryLogger.always().info("{}: Variable changed {} <- {} {}", payload[0], payload[1], payload[4],
603 payload[5]);
604 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
605 break;
606 }
607
608 case "TRAFFICCONTROL.CONTROLLER_WARNING":
609 {
610 Object[] payload = (Object[]) event.getContent();
611 CategoryLogger.always().info("{}: Warning {}", payload[0], payload[1]);
612 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
613 break;
614 }
615
616 case "TIME_CHANGED_EVENT":
617 {
618 CategoryLogger.always().info("Time changed to {}", event.getContent());
619 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0,
620 String.format("Time changed to %s", event.getContent())));
621 break;
622 }
623
624 case "NETWORK.GTU.ADD":
625 {
626 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, event.getContent()));
627 break;
628 }
629
630 case "NETWORK.GTU.REMOVE":
631 {
632 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, event.getContent()));
633 break;
634 }
635
636 default:
637 {
638 CategoryLogger.always().info("Event of unhandled type {} with payload {}", event.getType(),
639 event.getContent());
640 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", "Event of unhandled type", 0, String
641 .format("%s: Event of unhandled type %s with payload {}", event.getType(), event.getContent())));
642 break;
643 }
644 }
645 System.out.println("notify: finished processing event " + eventTypeName);
646 }
647 catch (Sim0MQException | SerializationException e)
648 {
649 e.printStackTrace();
650 }
651 }
652
653
654
655
656 class Sim0MQRemoteControlSwingApplication extends OTSSimulationApplication<OTSModelInterface>
657 {
658
659 private static final long serialVersionUID = 1L;
660
661
662
663
664
665
666 Sim0MQRemoteControlSwingApplication(final OTSModelInterface model, final OTSAnimationPanel panel)
667 throws OTSDrawingException
668 {
669 super(model, panel);
670 }
671 }
672
673
674
675
676 class Sim0MQOTSModel extends AbstractOTSModel implements EventListenerInterface
677 {
678
679 private static final long serialVersionUID = 20170419L;
680
681
682 @SuppressWarnings("checkstyle:visibilitymodifier")
683 OTSRoadNetwork network;
684
685
686 private final String xml;
687
688
689
690
691
692
693
694 Sim0MQOTSModel(final OTSSimulatorInterface simulator, final String shortName, final String description,
695 final String xml)
696 {
697 super(simulator, shortName, description);
698 this.xml = xml;
699 }
700
701
702 @Override
703 public void notify(final EventInterface event) throws RemoteException
704 {
705 System.err.println("Received event " + event);
706 }
707
708
709 @Override
710 public void constructModel() throws SimRuntimeException
711 {
712 this.network = new OTSRoadNetwork(getShortName(), true, getSimulator());
713 try
714 {
715 XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
716 false);
717 LaneCombinationList ignoreList = new LaneCombinationList();
718 LaneCombinationList permittedList = new LaneCombinationList();
719 ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
720 getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)), ignoreList,
721 permittedList);
722 }
723 catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException
724 | SAXException | ParserConfigurationException | GTUException | IOException
725 | TrafficControlException exception)
726 {
727 exception.printStackTrace();
728
729
730 throw new SimRuntimeException(exception);
731 }
732 }
733
734
735 @Override
736 public OTSNetwork getNetwork()
737 {
738 return this.network;
739 }
740
741
742 @Override
743 public Serializable getSourceId()
744 {
745 return "Sim0MQOTSModel";
746 }
747
748 }
749
750 }