1 package org.opentrafficsim.remotecontrol;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.io.Serializable;
11 import java.net.URISyntaxException;
12 import java.nio.charset.StandardCharsets;
13 import java.rmi.RemoteException;
14 import java.util.Arrays;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.concurrent.atomic.AtomicInteger;
19
20 import javax.naming.NamingException;
21 import javax.swing.JFrame;
22 import javax.swing.JPanel;
23 import javax.swing.JScrollPane;
24 import javax.xml.bind.JAXBException;
25 import javax.xml.parsers.ParserConfigurationException;
26
27 import org.djunits.value.ValueRuntimeException;
28 import org.djunits.value.vdouble.scalar.Duration;
29 import org.djunits.value.vdouble.scalar.Length;
30 import org.djunits.value.vdouble.scalar.Time;
31 import org.djutils.cli.Checkable;
32 import org.djutils.cli.CliUtil;
33 import org.djutils.decoderdumper.HexDumper;
34 import org.djutils.event.EventInterface;
35 import org.djutils.event.EventListenerInterface;
36 import org.djutils.event.EventTypeInterface;
37 import org.djutils.immutablecollections.ImmutableMap;
38 import org.djutils.logger.CategoryLogger;
39 import org.djutils.logger.LogCategory;
40 import org.djutils.serialization.SerializationException;
41 import org.opentrafficsim.base.parameters.ParameterException;
42 import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
43 import org.opentrafficsim.core.dsol.AbstractOTSModel;
44 import org.opentrafficsim.core.dsol.OTSAnimator;
45 import org.opentrafficsim.core.dsol.OTSModelInterface;
46 import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
47 import org.opentrafficsim.core.geometry.DirectedPoint;
48 import org.opentrafficsim.core.geometry.OTSGeometryException;
49 import org.opentrafficsim.core.gtu.GTU;
50 import org.opentrafficsim.core.gtu.GTUException;
51 import org.opentrafficsim.core.gtu.GTUType;
52 import org.opentrafficsim.core.network.Network;
53 import org.opentrafficsim.core.network.NetworkException;
54 import org.opentrafficsim.core.network.OTSNetwork;
55 import org.opentrafficsim.core.object.InvisibleObjectInterface;
56 import org.opentrafficsim.draw.core.OTSDrawingException;
57 import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
58 import org.opentrafficsim.road.network.OTSRoadNetwork;
59 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
60 import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
61 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
62 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
63 import org.opentrafficsim.swing.gui.OTSAnimationPanel;
64 import org.opentrafficsim.swing.gui.OTSSimulationApplication;
65 import org.opentrafficsim.swing.gui.OTSSwingApplication;
66 import org.opentrafficsim.trafficcontrol.TrafficControlException;
67 import org.opentrafficsim.trafficcontrol.TrafficController;
68 import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
69 import org.pmw.tinylog.Level;
70 import org.sim0mq.Sim0MQException;
71 import org.sim0mq.message.Sim0MQMessage;
72 import org.xml.sax.SAXException;
73 import org.zeromq.SocketType;
74 import org.zeromq.ZContext;
75 import org.zeromq.ZMQ;
76
77 import nl.tudelft.simulation.dsol.SimRuntimeException;
78 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
79 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
80 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeAnimator;
81 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
82 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
83 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
84 import nl.tudelft.simulation.jstats.streams.StreamInterface;
85 import picocli.CommandLine.Command;
86 import picocli.CommandLine.Option;
87
88
89
90
91
92
93
94
95
96
97
98
99 public class Sim0MQControlledOTS implements EventListenerInterface
100 {
101
102 private static final long serialVersionUID = 20200317L;
103
104
105 private Sim0MQOTSModel model = null;
106
107
108 private final ZContext zContext;
109
110
111 private final int port;
112
113
114 private final MasterCommunication masterCommunication = new MasterCommunication();
115
116
117
118
119
120
121 public Sim0MQControlledOTS(final ZContext zContext, final int port)
122 {
123 this.zContext = zContext;
124 this.port = port;
125 this.masterCommunication.start();
126 }
127
128
129
130
131 class MasterCommunication extends Thread
132 {
133 @Override
134 public void run()
135 {
136 System.err.println("MasterCommunication thread id is " + Thread.currentThread().getId());
137 ZMQ.Socket remoteControllerSocket = Sim0MQControlledOTS.this.zContext.createSocket(SocketType.PAIR);
138 remoteControllerSocket.setHWM(100000);
139 remoteControllerSocket.bind("tcp://*:" + Sim0MQControlledOTS.this.port);
140 ZMQ.Socket resultQueue = Sim0MQControlledOTS.this.zContext.createSocket(SocketType.PULL);
141 resultQueue.bind("inproc://results");
142 ZMQ.Socket toCommandLoop = Sim0MQControlledOTS.this.zContext.createSocket(SocketType.PUSH);
143 toCommandLoop.setHWM(1000);
144 toCommandLoop.connect("inproc://commands");
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 ZMQ.Poller poller = Sim0MQControlledOTS.this.zContext.createPoller(2);
176 poller.register(remoteControllerSocket, ZMQ.Poller.POLLIN);
177 poller.register(resultQueue, ZMQ.Poller.POLLIN);
178 while (!Thread.currentThread().isInterrupted())
179 {
180 poller.poll();
181 if (poller.pollin(0))
182 {
183 System.err.println("Got incoming command");
184 byte[] data = remoteControllerSocket.recv();
185 toCommandLoop.send(data, 0);
186 System.err.println("Incoming command handed over to toCommandLoop socket");
187 }
188 else if (poller.pollin(1))
189 {
190 System.err.println("Got outgoing result");
191 byte[] data = resultQueue.recv();
192 remoteControllerSocket.send(data, 0);
193 System.err.println("Outgoing result handed over to remoteControllerSocket");
194 }
195 }
196
197 }
198 }
199
200
201
202
203 @Command(description = "Sim0MQ Remotely Controlled OTS", name = "Sim0MQOTS", mixinStandardHelpOptions = true,
204 version = "1.0")
205 public static class Options implements Checkable
206 {
207
208 @Option(names = { "-p", "--port" }, description = "Internet port to use", defaultValue = "8888")
209 private int port;
210
211
212
213
214
215 public final int getPort()
216 {
217 return this.port;
218 }
219
220 @Override
221 public final void check() throws Exception
222 {
223 if (this.port <= 0 || this.port > 65535)
224 {
225 throw new Exception("Port should be between 1 and 65535");
226 }
227 }
228 }
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243 public static void main(final String[] args) throws NetworkException, OTSGeometryException, NamingException,
244 ValueRuntimeException, ParameterException, SimRuntimeException, Sim0MQException, SerializationException, IOException
245 {
246 CategoryLogger.setAllLogLevel(Level.WARNING);
247 CategoryLogger.setLogCategories(LogCategory.ALL);
248 Options options = new Options();
249 CliUtil.execute(options, args);
250 int port = options.getPort();
251 System.out.println("Creating OTS server listening on port " + port);
252 ZContext context = new ZContext(10);
253 Sim0MQControlledOTSrolledOTS.html#Sim0MQControlledOTS">Sim0MQControlledOTS slave = new Sim0MQControlledOTS(context, port);
254
255 slave.commandLoop();
256
257 context.destroy();
258 context.close();
259 }
260
261
262
263
264
265
266
267
268
269 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
270 {
271 if (null != this.model)
272 {
273 return "Cannot create another network (yet)";
274 }
275 else
276 {
277 try
278 {
279 OTSAnimator animator = new OTSAnimator("OTS Animator");
280 this.model = new Sim0MQOTSModel(animator, "OTS model", "Remotely controlled OTS model", xml);
281 Map<String, StreamInterface> map = new LinkedHashMap<>();
282 map.put("generation", new MersenneTwister(seed));
283 animator.initialize(Time.ZERO, simulationDuration, warmupTime, this.model, map);
284 this.model.getNetwork().addListener(this, Network.GTU_ADD_EVENT);
285 this.model.getNetwork().addListener(this, Network.GTU_REMOVE_EVENT);
286 OTSAnimationPanel animationPanel =
287 new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000), animator,
288 this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
289 DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
290 new Sim0MQRemoteControlSwingApplication(this.model, animationPanel);
291 JFrame frame = (JFrame) animationPanel.getParent().getParent().getParent();
292 frame.setExtendedState(Frame.NORMAL);
293 frame.setSize(new Dimension(1100, 1000));
294 frame.setBounds(0, 25, 1100, 1000);
295 animator.setSpeedFactor(Double.MAX_VALUE, true);
296 animator.setSpeedFactor(1000.0, true);
297
298 ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap =
299 this.model.getNetwork().getInvisibleObjectMap();
300 animator.addListener(this, DEVSRealTimeAnimator.CHANGE_SPEED_FACTOR_EVENT);
301 animator.addListener(this, SimulatorInterface.TIME_CHANGED_EVENT);
302 for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
303 {
304 if (ioi instanceof TrafCOD)
305 {
306 TrafCOD trafCOD = (TrafCOD) ioi;
307 Container controllerDisplayPanel = trafCOD.getDisplayContainer();
308 if (null != controllerDisplayPanel)
309 {
310 JPanel wrapper = new JPanel(new BorderLayout());
311 wrapper.add(new JScrollPane(controllerDisplayPanel));
312 TabbedContentPane tabbedPane = animationPanel.getTabbedPane();
313 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
314 }
315
316
317 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_CONTROLLER_WARNING);
318 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_CONFLICT_GROUP_CHANGED);
319 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_STATE_CHANGED);
320 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_VARIABLE_CREATED);
321 trafCOD.addListener(this, TrafficController.TRAFFICCONTROL_TRACED_VARIABLE_UPDATED);
322 }
323 }
324 try
325 {
326 Thread.sleep(300);
327 }
328 catch (InterruptedException e)
329 {
330 e.printStackTrace();
331 }
332 animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
333 }
334 catch (Exception e)
335 {
336 return e.getMessage();
337 }
338 }
339 return null;
340 }
341
342
343 private AtomicInteger packetsSent = new AtomicInteger(0);
344
345
346
347
348 @SuppressWarnings("checkstyle:methodlength")
349 public void commandLoop()
350 {
351 System.err.println("CommandLoop thread id is " + Thread.currentThread().getId());
352 ZMQ.Socket incomingCommands = this.zContext.createSocket(SocketType.PULL);
353 incomingCommands.bind("inproc://commands");
354 while (!Thread.currentThread().isInterrupted())
355 {
356
357 System.err.println("CommandLoop ready to read a command");
358 byte[] request = incomingCommands.recv(0);
359 System.err.println("CommandLoop processing a command of " + request.length + " bytes");
360 Object[] message;
361 String result = "At your command";
362 try
363 {
364 message = Sim0MQMessage.decode(request).createObjectArray();
365 System.out.println("Received Sim0MQ message:");
366
367 if (message.length >= 8 && message[5] instanceof String)
368 {
369 String command = (String) message[5];
370 System.out.println("Command is " + command);
371 switch (command)
372 {
373 case "LOADNETWORK":
374 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
375 && message[10] instanceof Duration && message[11] instanceof Long)
376 {
377 System.out.println("xml length = " + ((String) message[8]).length());
378 String loadResult = loadNetwork((String) message[8], (Duration) message[9],
379 (Duration) message[10], (Long) message[11]);
380 if (null != loadResult)
381 {
382 result = loadResult;
383 }
384 }
385 else
386 {
387 result = "no network, warmupTime and/or runTime provided with LOADNETWORK command";
388 }
389 break;
390
391 case "SIMULATEUNTIL":
392 if (null == this.model)
393 {
394 result = "No model loaded";
395 }
396 else if (message.length == 9 && message[8] instanceof Time)
397 {
398 OTSSimulatorInterface simulator = this.model.getSimulator();
399 System.out.println("Simulating up to " + message[8]);
400 simulator.runUpTo(new SimTimeDoubleUnit((Time) message[8]));
401 int count = 0;
402 while (simulator.isStartingOrRunning())
403 {
404 System.out.print(".");
405 count++;
406 if (count > 1000)
407 {
408 System.out.println("SIMULATOR DOES NOT STOP. TIME = " + simulator.getSimulatorTime());
409 Iterator<SimEventInterface<SimTimeDoubleUnit>> elIt =
410 simulator.getEventList().iterator();
411 while (elIt.hasNext())
412 {
413 System.out.println("EVENTLIST: " + elIt.next());
414 }
415 simulator.stop();
416 }
417 try
418 {
419 Thread.sleep(10);
420 }
421 catch (InterruptedException e)
422 {
423 e.printStackTrace();
424 }
425 }
426 System.out.println("Simulator has stopped at time " + simulator.getSimulatorTime());
427 try
428 {
429 Thread.sleep(100);
430 }
431 catch (InterruptedException e)
432 {
433 e.printStackTrace();
434 }
435 }
436 else
437 {
438 result = "Bad or missing stop time";
439 }
440 break;
441
442 case "SENDALLGTUPOSITIONS":
443 if (null == this.model)
444 {
445 result = "No model loaded";
446 }
447 else if (message.length == 8)
448 {
449 for (GTU gtu : this.model.network.getGTUs())
450 {
451
452 try
453 {
454 DirectedPoint gtuPosition = gtu.getLocation();
455 Object[] gtuData = new Object[] { gtu.getId(), gtu.getGTUType().getId(), gtuPosition.x,
456 gtuPosition.y, gtuPosition.z, gtuPosition.getRotZ(), gtu.getSpeed(),
457 gtu.getAcceleration() };
458 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave_XXXXX", "master", "GTUPOSITION",
459 0, gtuData));
460 }
461 catch (Sim0MQException | SerializationException e)
462 {
463 e.printStackTrace();
464 break;
465 }
466
467 }
468 }
469 break;
470
471 default:
472
473 System.out.println("Don't know how to handle message:");
474 System.out.println(Sim0MQMessage.print(message));
475 result = "Unimplemented command " + command;
476 break;
477 }
478 }
479 else
480 {
481 System.out.println("Don't know how to handle message:");
482 System.out.println(HexDumper.hexDumper(request));
483 result = "Ignored message";
484 }
485 }
486 catch (Sim0MQException | SerializationException e)
487 {
488 e.printStackTrace();
489 result = "Could not decode command: " + e.getMessage();
490 }
491
492 try
493 {
494 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave_XXXXX", "master", "READY", 0, result));
495 }
496 catch (Sim0MQException | SerializationException e)
497 {
498 e.printStackTrace();
499 break;
500 }
501 }
502 }
503
504
505 private Map<Long, ZMQ.Socket> socketMap = new LinkedHashMap<>();
506
507
508
509
510
511 public synchronized void sendToMaster(final byte[] data)
512 {
513 byte[] fixedData = data;
514 int number = -1;
515 try
516 {
517
518 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
519 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
520 number = this.packetsSent.addAndGet(1);
521 fixedData =
522 Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
523 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
524 System.err.println("Prepared message " + number + ", type is " + messageFields[5]);
525 }
526 catch (Sim0MQException | SerializationException e)
527 {
528 e.printStackTrace();
529 }
530 Long threadId = Thread.currentThread().getId();
531 ZMQ.Socket socket = this.socketMap.get(threadId);
532 while (null == socket)
533 {
534 System.out.println("Creating new internal socket for thread " + threadId);
535 try
536 {
537 socket = this.zContext.createSocket(SocketType.PUSH);
538 socket.setHWM(100000);
539 socket.connect("inproc://results");
540 this.socketMap.put(threadId, socket);
541
542 }
543 catch (Exception cbie)
544 {
545 System.err.println("Caught funny exception - probably related to DSOL animator start/stop code ... retrying");
546 try
547 {
548 Thread.sleep(100);
549 }
550 catch (InterruptedException e)
551 {
552 System.err.println("Sleep interrupted!");
553 }
554 }
555 }
556 System.out.println("pre send");
557
558
559
560
561 socket.send(fixedData, 0);
562
563
564 }
565
566
567 @Override
568 public void notify(final EventInterface event) throws RemoteException
569 {
570 try
571 {
572 EventTypeInterface type = event.getType();
573 String eventTypeName = type.getName();
574 System.out.println("notify: start processing event " + eventTypeName);
575 switch (eventTypeName)
576 {
577 case "TRAFFICCONTROL.CONTROLLER_EVALUATING":
578 {
579 Object[] payload = (Object[]) event.getContent();
580 CategoryLogger.always().info("{}: Evaluating at time {}", payload[0], payload[1]);
581 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0,
582 String.format("%s: Evaluating at time %s", payload[0], payload[1])));
583 break;
584 }
585
586 case "TRAFFICCONTROL.CONFLICT_GROUP_CHANGED":
587 {
588 Object[] payload = (Object[]) event.getContent();
589 CategoryLogger.always().info("{}: Conflict group changed from {} to {}", payload[0], payload[1],
590 payload[2]);
591 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
592 break;
593 }
594
595 case "TRAFFICCONTROL.VARIABLE_UPDATED":
596 {
597 Object[] payload = (Object[]) event.getContent();
598 CategoryLogger.always().info("{}: Variable changed {} <- {} {}", payload[0], payload[1], payload[4],
599 payload[5]);
600 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
601 break;
602 }
603
604 case "TRAFFICCONTROL.CONTROLLER_WARNING":
605 {
606 Object[] payload = (Object[]) event.getContent();
607 CategoryLogger.always().info("{}: Warning {}", payload[0], payload[1]);
608 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, payload));
609 break;
610 }
611
612 case "TIME_CHANGED_EVENT":
613 {
614 CategoryLogger.always().info("Time changed to {}", event.getContent());
615 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0,
616 String.format("Time changed to %s", event.getContent())));
617 break;
618 }
619
620 case "NETWORK.GTU.ADD":
621 {
622 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, event.getContent()));
623 break;
624 }
625
626 case "NETWORK.GTU.REMOVE":
627 {
628 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", eventTypeName, 0, event.getContent()));
629 break;
630 }
631
632 default:
633 {
634 CategoryLogger.always().info("Event of unhandled type {} with payload {}", event.getType(),
635 event.getContent());
636 sendToMaster(Sim0MQMessage.encodeUTF8(true, 0, "slave", "master", "Event of unhandled type", 0, String
637 .format("%s: Event of unhandled type %s with payload {}", event.getType(), event.getContent())));
638 break;
639 }
640 }
641 System.out.println("notify: finished processing event " + eventTypeName);
642 }
643 catch (Sim0MQException | SerializationException e)
644 {
645 e.printStackTrace();
646 }
647 }
648
649
650
651
652 class Sim0MQRemoteControlSwingApplication extends OTSSimulationApplication<OTSModelInterface>
653 {
654
655 private static final long serialVersionUID = 1L;
656
657
658
659
660
661
662 Sim0MQRemoteControlSwingApplication(final OTSModelInterface model, final OTSAnimationPanel panel)
663 throws OTSDrawingException
664 {
665 super(model, panel);
666 }
667 }
668
669
670
671
672 class Sim0MQOTSModel extends AbstractOTSModel implements EventListenerInterface
673 {
674
675 private static final long serialVersionUID = 20170419L;
676
677
678 @SuppressWarnings("checkstyle:visibilitymodifier")
679 OTSRoadNetwork network;
680
681
682 private final String xml;
683
684
685
686
687
688
689
690 Sim0MQOTSModel(final OTSSimulatorInterface simulator, final String shortName, final String description,
691 final String xml)
692 {
693 super(simulator, shortName, description);
694 this.xml = xml;
695 }
696
697
698 @Override
699 public void notify(final EventInterface event) throws RemoteException
700 {
701 System.err.println("Received event " + event);
702 }
703
704
705 @Override
706 public void constructModel() throws SimRuntimeException
707 {
708 this.network = new OTSRoadNetwork(getShortName(), true, getSimulator());
709 try
710 {
711 XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
712 false);
713 LaneCombinationList ignoreList = new LaneCombinationList();
714 LaneCombinationList permittedList = new LaneCombinationList();
715 ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
716 getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)), ignoreList,
717 permittedList);
718 }
719 catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException
720 | SAXException | ParserConfigurationException | GTUException | IOException
721 | TrafficControlException exception)
722 {
723 exception.printStackTrace();
724
725
726 throw new SimRuntimeException(exception);
727 }
728 }
729
730
731 @Override
732 public OTSNetwork getNetwork()
733 {
734 return this.network;
735 }
736
737
738 @Override
739 public Serializable getSourceId()
740 {
741 return "Sim0MQOTSModel";
742 }
743
744 }
745
746 }