View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.awt.BorderLayout;
4   import java.awt.Container;
5   import java.awt.Dimension;
6   import java.awt.Frame;
7   import java.awt.event.ActionEvent;
8   import java.io.ByteArrayInputStream;
9   import java.io.IOException;
10  import java.io.Serializable;
11  import java.net.URISyntaxException;
12  import java.nio.charset.StandardCharsets;
13  import java.rmi.RemoteException;
14  import java.util.Arrays;
15  import java.util.HashMap;
16  import java.util.Iterator;
17  import java.util.LinkedHashMap;
18  import java.util.List;
19  import java.util.Map;
20  import java.util.concurrent.atomic.AtomicInteger;
21  
22  import javax.swing.JFrame;
23  import javax.swing.JPanel;
24  import javax.swing.JScrollPane;
25  import javax.xml.bind.JAXBException;
26  import javax.xml.parsers.ParserConfigurationException;
27  
28  import org.djunits.value.vdouble.scalar.Duration;
29  import org.djunits.value.vdouble.scalar.Length;
30  import org.djunits.value.vdouble.scalar.Time;
31  import org.djutils.decoderdumper.HexDumper;
32  import org.djutils.immutablecollections.ImmutableMap;
33  import org.djutils.serialization.SerializationException;
34  import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
35  import org.opentrafficsim.core.dsol.AbstractOTSModel;
36  import org.opentrafficsim.core.dsol.OTSAnimator;
37  import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
38  import org.opentrafficsim.core.geometry.OTSGeometryException;
39  import org.opentrafficsim.core.gtu.GTUException;
40  import org.opentrafficsim.core.gtu.GTUType;
41  import org.opentrafficsim.core.network.NetworkException;
42  import org.opentrafficsim.core.network.OTSNetwork;
43  import org.opentrafficsim.core.object.InvisibleObjectInterface;
44  import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
45  import org.opentrafficsim.road.network.OTSRoadNetwork;
46  import org.opentrafficsim.road.network.factory.xml.XmlParserException;
47  import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
48  import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
49  import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
50  import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
51  import org.opentrafficsim.sim0mq.publisher.Publisher;
52  import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
53  import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
54  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
55  import org.opentrafficsim.swing.gui.OTSAnimationPanel;
56  import org.opentrafficsim.swing.gui.OTSSimulationApplication;
57  import org.opentrafficsim.swing.gui.OTSSwingApplication;
58  import org.opentrafficsim.swing.script.AbstractSimulationScript;
59  import org.opentrafficsim.trafficcontrol.TrafficControlException;
60  import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
61  import org.sim0mq.Sim0MQException;
62  import org.sim0mq.message.Sim0MQMessage;
63  import org.xml.sax.SAXException;
64  import org.zeromq.SocketType;
65  import org.zeromq.ZContext;
66  import org.zeromq.ZMQ;
67  
68  import nl.tudelft.simulation.dsol.SimRuntimeException;
69  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
70  import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
71  import nl.tudelft.simulation.jstats.streams.MersenneTwister;
72  import nl.tudelft.simulation.jstats.streams.StreamInterface;
73  
74  /**
75   * Sim0MQPublisher - make many OTS simulation controls and observations available over Sim0MQ.
76   * <p>
77   * Copyright (c) 2020-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
78   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
79   * <p>
80   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
81   * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
82   */
83  public final class Sim0MQPublisher
84  {
85      /** The publisher. */
86      private Publisher publisher = null;
87  
88      /** The ZContect. */
89      private final ZContext zContext;
90  
91      /** The simulation model. */
92      private Sim0MQOTSModel model = null;
93  
94      /** The OTS road network. */
95      private OTSRoadNetwork network = null;
96  
97      /** The OTS animation panel. */
98      private OTSAnimationPanel animationPanel = null;
99  
100     /**
101      * Create a new Sim0MQPublisher that is operated through //inproc sockets.
102      * @param zContext ZContext; needed to create the sockets
103      * @param controlInput String; PULL socket for control input
104      * @param resultOutput String; PUSH socket to output results
105      */
106     public Sim0MQPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
107     {
108         this.zContext = zContext;
109         ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
110         controlSocket.bind("inproc://" + controlInput);
111         ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
112         resultOutputQueue.connect("inproc://" + resultOutput);
113         pollingLoop(controlSocket, resultOutputQueue);
114     }
115 
116     /**
117      * Create a new Sim0MQPublisher that uses TCP transport.
118      * @param port int; port number to bind to
119      */
120     public Sim0MQPublisher(final int port)
121     {
122         this.zContext = new ZContext(5);
123         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
124         socket.bind("tcp://*:" + port);
125         pollingLoop(socket, socket);
126     }
127 
128     /**
129      * Create a new Sim0MQPublisher that uses TCP transport.
130      * @param port int; port number to bind to
131      * @param preloadedSimulation AbstractSimulationScript; a fully loaded (but not started) simulation
132      * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers (may be
133      *            null)
134      * @param incomingDataHandlers List&lt;IncomfingDataHandler&gt;; list of additional handlers for incoming data that is not
135      *            handled by the standard Sim0MQPublisher (may be null)
136      * @throws RemoteException when construction of the Publisher failed
137      */
138     public Sim0MQPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
139             final List<SubscriptionHandler> additionalSubscriptionHandlers,
140             final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
141     {
142         this.zContext = new ZContext(5);
143         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
144         socket.bind("tcp://*:" + port);
145         this.network = preloadedSimulation.getNetwork();
146         this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, null);
147         this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
148         ((OTSAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
149         ((OTSAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
150         pollingLoop(socket, socket);
151         System.exit(0);
152     }
153 
154     /**
155      * Create a new Sim0MQPublisher that uses TCP transport.
156      * @param port int; port number to bind to
157      * @param preloadedSimulation AbstractSimulationScript; a fully loaded (but not started) simulation
158      * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers (may be
159      *            null)
160      * @throws RemoteException when construction of the Publisher failed
161      */
162     public Sim0MQPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
163             final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
164     {
165         this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
166     }
167 
168     /**
169      * Poller that receives the commands and ensures that various output sources can talk to the master.
170      * @param controlSocket ZMQ.Socket; PULL socket for commands from the master
171      * @param resultOutputQueue ZMQ.Socket; PULL socket for output that must be relayed to the master
172      */
173     private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
174     {
175         System.out
176                 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
177         resultOutputQueue.setHWM(100000);
178         AtomicInteger packetsSent = new AtomicInteger(0);
179         Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
180         ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
181         resultInputQueue.bind("inproc://simulationEvents");
182         // Poll the two input sockets using ZMQ poller
183         ZMQ.Poller poller = this.zContext.createPoller(2);
184         // TODO ensure that this also handles a closed control socket gracefully
185         poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
186         poller.register(controlSocket, ZMQ.Poller.POLLIN);
187         while (!Thread.currentThread().isInterrupted())
188         {
189             // System.out.println("Publisher calls Poller.poll()");
190             poller.poll();
191             if (poller.pollin(0))
192             {
193                 byte[] data = resultInputQueue.recv();
194                 // System.out.println("Publisher got outgoing result of " + data.length + " bytes");
195                 byte[] fixedData = data;
196                 int number = -1;
197                 try
198                 {
199                     // Patch the sender field to include the packet counter value - this is bloody expensive...
200                     Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
201                     Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
202                     number = packetsSent.addAndGet(1);
203                     fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
204                             messageFields[4], messageFields[5], messageFields[6], newMessageFields);
205                     // System.out
206                     // .println("Prepared message " + number + ", type is \"" + messageFields[5] + "\", " + messageFields[6]);
207                 }
208                 catch (Sim0MQException | SerializationException e)
209                 {
210                     e.printStackTrace();
211                 }
212                 resultOutputQueue.send(fixedData, 0);
213                 // System.out.println("Outgoing result handed over to controlSocket");
214                 continue; // Check for more results before checking the control input
215             }
216             if (poller.pollin(1))
217             {
218                 byte[] data = controlSocket.recv();
219                 // System.out.println("Publisher received a command of " + data.length + " bytes");
220                 if (!handleCommand(data, socketMap))
221                 {
222                     break;
223                 }
224             }
225         }
226         System.out.println("Exiting publisher polling loop");
227     }
228 
229     /**
230      * Construct an OTS simulation experiment from an XML description.
231      * @param xml String; the XML encoded network
232      * @param simulationDuration Duration; total duration of the simulation
233      * @param warmupTime Duration; warm up time of the simulation
234      * @param seed Long; seed for the experiment
235      * @return String; null on success, description of the problem on error
236      */
237     private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
238     {
239         try
240         {
241             OTSAnimator animator = new OTSAnimator("OTS Animator");
242             this.network = new OTSRoadNetwork("OTS model for Sim0MQPublisher", true, animator);
243             this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, xml);
244             Map<String, StreamInterface> map = new LinkedHashMap<>();
245             map.put("generation", new MersenneTwister(seed));
246             animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
247             this.publisher = new Publisher(this.network);
248             this.animationPanel = new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
249                     animator, this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
250             new OTSSimulationApplication<Sim0MQOTSModel>(this.model, this.animationPanel);
251             DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
252             JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
253             frame.setExtendedState(Frame.NORMAL);
254             frame.setSize(new Dimension(1100, 1000));
255             frame.setBounds(0, 25, 1100, 1000);
256             animator.setSpeedFactor(Double.MAX_VALUE, true);
257             animator.setSpeedFactor(1000.0, true);
258 
259             ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap = this.model.getNetwork().getInvisibleObjectMap();
260             for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
261             {
262                 if (ioi instanceof TrafCOD)
263                 {
264                     TrafCOD trafCOD = (TrafCOD) ioi;
265                     Container controllerDisplayPanel = trafCOD.getDisplayContainer();
266                     if (null != controllerDisplayPanel)
267                     {
268                         JPanel wrapper = new JPanel(new BorderLayout());
269                         wrapper.add(new JScrollPane(controllerDisplayPanel));
270                         TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
271                         tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
272                     }
273                 }
274             }
275             try
276             {
277                 Thread.sleep(300);
278             }
279             catch (InterruptedException e)
280             {
281                 e.printStackTrace();
282             }
283             this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
284         }
285         catch (Exception e)
286         {
287             return e.getMessage();
288         }
289         return null;
290     }
291 
292     /**
293      * Execute one remote control command.
294      * @param data byte[]; the SIM0MQ encoded command
295      * @param socketMap Map&lt;Long, ZMQ.Socket&gt;; cache of created sockets for returned messages
296      * @return boolean; true if another command can be processed after this one; false when no further commands can be processed
297      */
298     @SuppressWarnings("checkstyle:methodlength")
299     private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
300     {
301         boolean result = true;
302         try
303         {
304             Object[] message = Sim0MQMessage.decode(data).createObjectArray();
305             String resultMessage = "OK";
306             Boolean ackNack = null;
307 
308             if (message.length >= 8 && message[5] instanceof String)
309             {
310                 String command = (String) message[5];
311                 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
312 
313                 String[] parts = command.split("\\|");
314                 if (parts.length == 2)
315                 {
316                     // This is a command for the embedded Publisher
317                     ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
318                             new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
319                             socketMap);
320                     if (null == this.publisher)
321                     {
322                         returnWrapper.nack("No simulation loaded; cannot execute command " + command);
323                         System.err.println("No publisher for command " + command);
324                         return true;
325                     }
326                     Object[] payload = Arrays.copyOfRange(message, 8, message.length);
327                     this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
328                     return true;
329                 }
330                 else
331                 {
332                     switch (command)
333                     {
334                         case "NEWSIMULATION":
335                             if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
336                                     && message[10] instanceof Duration && message[11] instanceof Long)
337                             {
338                                 if (null != this.animationPanel)
339                                 {
340                                     for (Container container = this.animationPanel; container != null; container =
341                                             container.getParent())
342                                     {
343                                         if (container instanceof JFrame)
344                                         {
345                                             JFrame jFrame = (JFrame) container;
346                                             jFrame.dispose();
347                                         }
348                                     }
349                                 }
350                                 // System.out.println("xml length = " + ((String) message[8]).length());
351                                 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
352                                         (Long) message[11]);
353                                 ackNack = null == resultMessage;
354                                 if (ackNack)
355                                 {
356                                     resultMessage = "OK";
357                                 }
358                             }
359                             else
360                             {
361                                 resultMessage =
362                                         "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
363                                 ackNack = false;
364                             }
365                             break;
366 
367                         case "DIE":
368                             for (Container container = this.animationPanel; container != null; container =
369                                     container.getParent())
370                             {
371                                 // System.out.println("container is " + container);
372                                 if (container instanceof JFrame)
373                                 {
374                                     JFrame jFrame = (JFrame) container;
375                                     jFrame.dispose();
376                                 }
377                             }
378                             return false;
379 
380                         case "SIMULATEUNTIL":
381                             if (message.length == 9 && message[8] instanceof Time)
382                             {
383                                 System.out.println("Simulating up to " + message[8]);
384                                 if (null == this.network)
385                                 {
386                                     resultMessage = "No network loaded";
387                                     ackNack = false;
388                                     break;
389                                 }
390                                 OTSSimulatorInterface simulator = this.network.getSimulator();
391                                 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
392                                 {
393                                     resultMessage = "Simulation is already at end of simulation time";
394                                     ackNack = false;
395                                     break;
396                                 }
397                                 if (simulator.isStartingOrRunning())
398                                 {
399                                     resultMessage = "Simulator is already running"; // cannot happen for now
400                                     ackNack = false;
401                                     break;
402                                 }
403                                 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
404                                         message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
405                                 returnWrapper.ack(resultMessage);
406                                 simulator.runUpTo((Time) message[8]);
407                                 int count = 0;
408                                 while (this.network.getSimulator().isStartingOrRunning())
409                                 {
410                                     System.out.print(".");
411                                     count++;
412                                     if (count > 1000) // Quit after 1000 attempts of 10 ms; 10 s
413                                     {
414                                         System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
415                                                 + this.network.getSimulator().getSimulatorTime());
416                                         this.network.getSimulator().stop();
417                                         Iterator<SimEventInterface<Duration>> elIt =
418                                                 this.network.getSimulator().getEventList().iterator();
419                                         while (elIt.hasNext())
420                                         {
421                                             System.out.println("EVENTLIST: " + elIt.next());
422                                         }
423                                     }
424                                     try
425                                     {
426                                         Thread.sleep(10); // 10 ms
427                                     }
428                                     catch (InterruptedException e)
429                                     {
430                                         e.printStackTrace();
431                                     }
432                                 }
433                                 // TODO Fix this (it is still needed - 2020-06-16)
434                                 try
435                                 {
436                                     Thread.sleep(100); // EXTRA STOP FOR SYNC REASONS - BUG IN DSOL!
437                                 }
438                                 catch (InterruptedException e)
439                                 {
440                                     e.printStackTrace();
441                                 }
442                                 return true; // ack has been sent when simulation started
443                             }
444                             else
445                             {
446                                 resultMessage = "Bad or missing stop time";
447                                 ackNack = false;
448                             }
449                             break;
450 
451                         default:
452                             IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
453                             if (incomingDataHandler != null)
454                             {
455                                 resultMessage = incomingDataHandler.handleIncomingData(message);
456                                 ackNack = resultMessage == null;
457                             }
458                             else
459                             {
460                                 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
461                                 ackNack = false;
462                             }
463                             break;
464                     }
465                 }
466                 if (resultMessage != null)
467                 {
468                     ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
469                             new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
470                             socketMap);
471                     if (ackNack)
472                     {
473                         returnWrapper.ack(resultMessage);
474                     }
475                     else
476                     {
477                         returnWrapper.nack(resultMessage);
478                     }
479                 }
480             }
481             else
482             {
483                 // We cannot construct a ReturnWrapper because the request has too few fields.
484                 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
485                 System.err.println(HexDumper.hexDumper(data));
486                 return true; // Do we really want to try again?
487             }
488         }
489         catch (Sim0MQException | SerializationException | RemoteException e)
490         {
491             System.err.println("Publisher thread could not decode command");
492             e.printStackTrace();
493         }
494         return result;
495     }
496 
497 }
498 
499 /**
500  * The Model.
501  */
502 class Sim0MQOTSModel extends AbstractOTSModel
503 {
504     /** */
505     private static final long serialVersionUID = 20170419L;
506 
507     /** The network. */
508     private final OTSRoadNetwork network;
509 
510     /** The XML. */
511     private final String xml;
512 
513     /**
514      * @param description String; the model description
515      * @param network OTSRoadNetwork; the network
516      * @param xml String; the XML description of the simulation model
517      */
518     Sim0MQOTSModel(final String description, final OTSRoadNetwork network, final String xml)
519     {
520         super(network.getSimulator(), network.getId(), description);
521         this.network = network;
522         this.xml = xml;
523     }
524 
525     /** {@inheritDoc} */
526     @Override
527     public void constructModel() throws SimRuntimeException
528     {
529         try
530         {
531             XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
532                     false);
533             ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
534                     getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)),
535                     new LaneCombinationList(), new LaneCombinationList());
536         }
537         catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
538                 | ParserConfigurationException | GTUException | IOException | TrafficControlException exception)
539         {
540             exception.printStackTrace();
541             // Abusing the SimRuntimeException to propagate the message to the main method (the problem could actually be a
542             // parsing problem)
543             throw new SimRuntimeException(exception);
544         }
545     }
546 
547     /** {@inheritDoc} */
548     @Override
549     public OTSNetwork getNetwork()
550     {
551         return this.network;
552     }
553 
554     /** {@inheritDoc} */
555     @Override
556     public Serializable getSourceId()
557     {
558         return "Sim0MQPublisherModel";
559     }
560 
561 }