View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.awt.BorderLayout;
4   import java.awt.Container;
5   import java.awt.Dimension;
6   import java.awt.Frame;
7   import java.awt.event.ActionEvent;
8   import java.io.ByteArrayInputStream;
9   import java.io.IOException;
10  import java.io.Serializable;
11  import java.net.URISyntaxException;
12  import java.nio.charset.StandardCharsets;
13  import java.rmi.RemoteException;
14  import java.util.Arrays;
15  import java.util.HashMap;
16  import java.util.Iterator;
17  import java.util.LinkedHashMap;
18  import java.util.List;
19  import java.util.Map;
20  import java.util.concurrent.atomic.AtomicInteger;
21  
22  import javax.swing.JFrame;
23  import javax.swing.JPanel;
24  import javax.swing.JScrollPane;
25  import javax.xml.bind.JAXBException;
26  import javax.xml.parsers.ParserConfigurationException;
27  
28  import org.djunits.value.vdouble.scalar.Duration;
29  import org.djunits.value.vdouble.scalar.Length;
30  import org.djunits.value.vdouble.scalar.Time;
31  import org.djutils.decoderdumper.HexDumper;
32  import org.djutils.immutablecollections.ImmutableMap;
33  import org.djutils.serialization.SerializationException;
34  import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
35  import org.opentrafficsim.core.dsol.AbstractOTSModel;
36  import org.opentrafficsim.core.dsol.OTSAnimator;
37  import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
38  import org.opentrafficsim.core.geometry.OTSGeometryException;
39  import org.opentrafficsim.core.gtu.GTUException;
40  import org.opentrafficsim.core.gtu.GTUType;
41  import org.opentrafficsim.core.network.NetworkException;
42  import org.opentrafficsim.core.network.OTSNetwork;
43  import org.opentrafficsim.core.object.InvisibleObjectInterface;
44  import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
45  import org.opentrafficsim.road.network.OTSRoadNetwork;
46  import org.opentrafficsim.road.network.factory.xml.XmlParserException;
47  import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
48  import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
49  import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
50  import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
51  import org.opentrafficsim.sim0mq.publisher.Publisher;
52  import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
53  import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
54  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
55  import org.opentrafficsim.swing.gui.OTSAnimationPanel;
56  import org.opentrafficsim.swing.gui.OTSSimulationApplication;
57  import org.opentrafficsim.swing.gui.OTSSwingApplication;
58  import org.opentrafficsim.swing.script.AbstractSimulationScript;
59  import org.opentrafficsim.trafficcontrol.TrafficControlException;
60  import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
61  import org.sim0mq.Sim0MQException;
62  import org.sim0mq.message.Sim0MQMessage;
63  import org.xml.sax.SAXException;
64  import org.zeromq.SocketType;
65  import org.zeromq.ZContext;
66  import org.zeromq.ZMQ;
67  
68  import nl.tudelft.simulation.dsol.SimRuntimeException;
69  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
70  import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
71  import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
72  import nl.tudelft.simulation.jstats.streams.MersenneTwister;
73  import nl.tudelft.simulation.jstats.streams.StreamInterface;
74  
75  /**
76   * Sim0MQPublisher - make many OTS simulation controls and observations available over Sim0MQ.
77   * <p>
78   * Copyright (c) 2020-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
79   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
80   * <p>
81   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
82   * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
83   */
84  public final class Sim0MQPublisher
85  {
86      /** The publisher. */
87      private Publisher publisher = null;
88  
89      /** The ZContect. */
90      private final ZContext zContext;
91  
92      /** The simulation model. */
93      private Sim0MQOTSModel model = null;
94  
95      /** The OTS road network. */
96      private OTSRoadNetwork network = null;
97  
98      /** The OTS animation panel. */
99      private OTSAnimationPanel animationPanel = null;
100 
101     /**
102      * Create a new Sim0MQPublisher that is operated through //inproc sockets.
103      * @param zContext ZContext; needed to create the sockets
104      * @param controlInput String; PULL socket for control input
105      * @param resultOutput String; PUSH socket to output results
106      */
107     public Sim0MQPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
108     {
109         this.zContext = zContext;
110         ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
111         controlSocket.bind("inproc://" + controlInput);
112         ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
113         resultOutputQueue.connect("inproc://" + resultOutput);
114         pollingLoop(controlSocket, resultOutputQueue);
115     }
116 
117     /**
118      * Create a new Sim0MQPublisher that uses TCP transport.
119      * @param port int; port number to bind to
120      */
121     public Sim0MQPublisher(final int port)
122     {
123         this.zContext = new ZContext(5);
124         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
125         socket.bind("tcp://*:" + port);
126         pollingLoop(socket, socket);
127     }
128 
129     /**
130      * Create a new Sim0MQPublisher that uses TCP transport.
131      * @param port int; port number to bind to
132      * @param preloadedSimulation AbstractSimulationScript; a fully loaded (but not started) simulation
133      * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers (may be
134      *            null)
135      * @param incomingDataHandlers List&lt;IncomfingDataHandler&gt;; list of additional handlers for incoming data that is not
136      *            handled by the standard Sim0MQPublisher (may be null)
137      * @throws RemoteException when construction of the Publisher failed
138      */
139     public Sim0MQPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
140             final List<SubscriptionHandler> additionalSubscriptionHandlers,
141             final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
142     {
143         this.zContext = new ZContext(5);
144         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
145         socket.bind("tcp://*:" + port);
146         this.network = preloadedSimulation.getNetwork();
147         this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, null);
148         this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
149         ((OTSAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
150         ((OTSAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
151         pollingLoop(socket, socket);
152         System.exit(0);
153     }
154 
155     /**
156      * Create a new Sim0MQPublisher that uses TCP transport.
157      * @param port int; port number to bind to
158      * @param preloadedSimulation AbstractSimulationScript; a fully loaded (but not started) simulation
159      * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers (may be
160      *            null)
161      * @throws RemoteException when construction of the Publisher failed
162      */
163     public Sim0MQPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
164             final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
165     {
166         this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
167     }
168 
169     /**
170      * Poller that receives the commands and ensures that various output sources can talk to the master.
171      * @param controlSocket ZMQ.Socket; PULL socket for commands from the master
172      * @param resultOutputQueue ZMQ.Socket; PULL socket for output that must be relayed to the master
173      */
174     private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
175     {
176         System.out
177                 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
178         resultOutputQueue.setHWM(100000);
179         AtomicInteger packetsSent = new AtomicInteger(0);
180         Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
181         ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
182         resultInputQueue.bind("inproc://simulationEvents");
183         // Poll the two input sockets using ZMQ poller
184         ZMQ.Poller poller = this.zContext.createPoller(2);
185         // TODO ensure that this also handles a closed control socket gracefully
186         poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
187         poller.register(controlSocket, ZMQ.Poller.POLLIN);
188         while (!Thread.currentThread().isInterrupted())
189         {
190             // System.out.println("Publisher calls Poller.poll()");
191             poller.poll();
192             if (poller.pollin(0))
193             {
194                 byte[] data = resultInputQueue.recv();
195                 // System.out.println("Publisher got outgoing result of " + data.length + " bytes");
196                 byte[] fixedData = data;
197                 int number = -1;
198                 try
199                 {
200                     // Patch the sender field to include the packet counter value - this is bloody expensive...
201                     Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
202                     Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
203                     number = packetsSent.addAndGet(1);
204                     fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
205                             messageFields[4], messageFields[5], messageFields[6], newMessageFields);
206                     // System.out
207                     // .println("Prepared message " + number + ", type is \"" + messageFields[5] + "\", " + messageFields[6]);
208                 }
209                 catch (Sim0MQException | SerializationException e)
210                 {
211                     e.printStackTrace();
212                 }
213                 resultOutputQueue.send(fixedData, 0);
214                 // System.out.println("Outgoing result handed over to controlSocket");
215                 continue; // Check for more results before checking the control input
216             }
217             if (poller.pollin(1))
218             {
219                 byte[] data = controlSocket.recv();
220                 // System.out.println("Publisher received a command of " + data.length + " bytes");
221                 if (!handleCommand(data, socketMap))
222                 {
223                     break;
224                 }
225             }
226         }
227         System.out.println("Exiting publisher polling loop");
228     }
229 
230     /**
231      * Construct an OTS simulation experiment from an XML description.
232      * @param xml String; the XML encoded network
233      * @param simulationDuration Duration; total duration of the simulation
234      * @param warmupTime Duration; warm up time of the simulation
235      * @param seed Long; seed for the experiment
236      * @return String; null on success, description of the problem on error
237      */
238     private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
239     {
240         try
241         { 
242             OTSAnimator animator = new OTSAnimator("OTS Animator");
243             this.network = new OTSRoadNetwork("OTS model for Sim0MQPublisher", true, animator);
244             this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, xml);
245             Map<String, StreamInterface> map = new LinkedHashMap<>();
246             map.put("generation", new MersenneTwister(seed));
247             animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
248             this.publisher = new Publisher(this.network);
249             this.animationPanel = new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
250                     animator, this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
251             new OTSSimulationApplication<Sim0MQOTSModel>(this.model, this.animationPanel);
252             DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
253             JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
254             frame.setExtendedState(Frame.NORMAL);
255             frame.setSize(new Dimension(1100, 1000));
256             frame.setBounds(0, 25, 1100, 1000);
257             animator.setSpeedFactor(Double.MAX_VALUE, true);
258             animator.setSpeedFactor(1000.0, true);
259 
260             ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap = this.model.getNetwork().getInvisibleObjectMap();
261             for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
262             {
263                 if (ioi instanceof TrafCOD)
264                 {
265                     TrafCOD trafCOD = (TrafCOD) ioi;
266                     Container controllerDisplayPanel = trafCOD.getDisplayContainer();
267                     if (null != controllerDisplayPanel)
268                     {
269                         JPanel wrapper = new JPanel(new BorderLayout());
270                         wrapper.add(new JScrollPane(controllerDisplayPanel));
271                         TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
272                         tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
273                     }
274                 }
275             }
276             try
277             {
278                 Thread.sleep(300);
279             }
280             catch (InterruptedException e)
281             {
282                 e.printStackTrace();
283             }
284             this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
285         }
286         catch (Exception e)
287         {
288             return e.getMessage();
289         }
290         return null;
291     }
292 
293     /**
294      * Execute one remote control command.
295      * @param data byte[]; the SIM0MQ encoded command
296      * @param socketMap Map&lt;Long, ZMQ.Socket&gt;; cache of created sockets for returned messages
297      * @return boolean; true if another command can be processed after this one; false when no further commands can be processed
298      */
299     @SuppressWarnings("checkstyle:methodlength")
300     private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
301     {
302         boolean result = true;
303         try
304         {
305             Object[] message = Sim0MQMessage.decode(data).createObjectArray();
306             String resultMessage = "OK";
307             Boolean ackNack = null;
308 
309             if (message.length >= 8 && message[5] instanceof String)
310             {
311                 String command = (String) message[5];
312                 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
313 
314                 String[] parts = command.split("\\|");
315                 if (parts.length == 2)
316                 {
317                     // This is a command for the embedded Publisher
318                     ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
319                             new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
320                             socketMap);
321                     if (null == this.publisher)
322                     {
323                         returnWrapper.nack("No simulation loaded; cannot execute command " + command);
324                         System.err.println("No publisher for command " + command);
325                         return true;
326                     }
327                     Object[] payload = Arrays.copyOfRange(message, 8, message.length);
328                     this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
329                     return true;
330                 }
331                 else
332                 {
333                     switch (command)
334                     {
335                         case "NEWSIMULATION":
336                             if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
337                                     && message[10] instanceof Duration && message[11] instanceof Long)
338                             {
339                                 if (null != this.animationPanel)
340                                 {
341                                     for (Container container = this.animationPanel; container != null; container =
342                                             container.getParent())
343                                     {
344                                         if (container instanceof JFrame)
345                                         {
346                                             JFrame jFrame = (JFrame) container;
347                                             jFrame.dispose();
348                                         }
349                                     }
350                                 }
351                                 // System.out.println("xml length = " + ((String) message[8]).length());
352                                 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
353                                         (Long) message[11]);
354                                 ackNack = null == resultMessage;
355                                 if (ackNack)
356                                 {
357                                     resultMessage = "OK";
358                                 }
359                             }
360                             else
361                             {
362                                 resultMessage =
363                                         "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
364                                 ackNack = false;
365                             }
366                             break;
367 
368                         case "DIE":
369                             for (Container container = this.animationPanel; container != null; container =
370                                     container.getParent())
371                             {
372                                 // System.out.println("container is " + container);
373                                 if (container instanceof JFrame)
374                                 {
375                                     JFrame jFrame = (JFrame) container;
376                                     jFrame.dispose();
377                                 }
378                             }
379                             return false;
380 
381                         case "SIMULATEUNTIL":
382                             if (message.length == 9 && message[8] instanceof Time)
383                             {
384                                 System.out.println("Simulating up to " + message[8]);
385                                 if (null == this.network)
386                                 {
387                                     resultMessage = "No network loaded";
388                                     ackNack = false;
389                                     break;
390                                 }
391                                 OTSSimulatorInterface simulator = this.network.getSimulator();
392                                 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
393                                 {
394                                     resultMessage = "Simulation is already at end of simulation time";
395                                     ackNack = false;
396                                     break;
397                                 }
398                                 if (simulator.isStartingOrRunning())
399                                 {
400                                     resultMessage = "Simulator is already running"; // cannot happen for now
401                                     ackNack = false;
402                                     break;
403                                 }
404                                 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
405                                         message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
406                                 returnWrapper.ack(resultMessage);
407                                 simulator.runUpTo(new SimTimeDoubleUnit((Time) message[8]));
408                                 int count = 0;
409                                 while (this.network.getSimulator().isStartingOrRunning())
410                                 {
411                                     System.out.print(".");
412                                     count++;
413                                     if (count > 1000) // Quit after 1000 attempts of 10 ms; 10 s
414                                     {
415                                         System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
416                                                 + this.network.getSimulator().getSimulatorTime());
417                                         this.network.getSimulator().stop();
418                                         Iterator<SimEventInterface<SimTimeDoubleUnit>> elIt =
419                                                 this.network.getSimulator().getEventList().iterator();
420                                         while (elIt.hasNext())
421                                         {
422                                             System.out.println("EVENTLIST: " + elIt.next());
423                                         }
424                                     }
425                                     try
426                                     {
427                                         Thread.sleep(10); // 10 ms
428                                     }
429                                     catch (InterruptedException e)
430                                     {
431                                         e.printStackTrace();
432                                     }
433                                 }
434                                 // TODO Fix this (it is still needed - 2020-06-16)
435                                 try
436                                 {
437                                     Thread.sleep(100); // EXTRA STOP FOR SYNC REASONS - BUG IN DSOL!
438                                 }
439                                 catch (InterruptedException e)
440                                 {
441                                     e.printStackTrace();
442                                 }
443                                 return true; // ack has been sent when simulation started
444                             }
445                             else
446                             {
447                                 resultMessage = "Bad or missing stop time";
448                                 ackNack = false;
449                             }
450                             break;
451 
452                         default:
453                             IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
454                             if (incomingDataHandler != null)
455                             {
456                                 resultMessage = incomingDataHandler.handleIncomingData(message);
457                                 ackNack = resultMessage == null;
458                             }
459                             else
460                             {
461                                 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
462                                 ackNack = false;
463                             }
464                             break;
465                     }
466                 }
467                 if (resultMessage != null)
468                 {
469                     ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
470                             new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
471                             socketMap);
472                     if (ackNack)
473                     {
474                         returnWrapper.ack(resultMessage);
475                     }
476                     else
477                     {
478                         returnWrapper.nack(resultMessage);
479                     }
480                 }
481             }
482             else
483             {
484                 // We cannot construct a ReturnWrapper because the request has too few fields.
485                 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
486                 System.err.println(HexDumper.hexDumper(data));
487                 return true; // Do we really want to try again?
488             }
489         }
490         catch (Sim0MQException | SerializationException | RemoteException e)
491         {
492             System.err.println("Publisher thread could not decode command");
493             e.printStackTrace();
494         }
495         return result;
496     }
497 
498 }
499 
500 /**
501  * The Model.
502  */
503 class Sim0MQOTSModel extends AbstractOTSModel
504 {
505     /** */
506     private static final long serialVersionUID = 20170419L;
507 
508     /** The network. */
509     private final OTSRoadNetwork network;
510 
511     /** The XML. */
512     private final String xml;
513 
514     /**
515      * @param description String; the model description
516      * @param network OTSRoadNetwork; the network
517      * @param xml String; the XML description of the simulation model
518      */
519     Sim0MQOTSModel(final String description, final OTSRoadNetwork network, final String xml)
520     {
521         super(network.getSimulator(), network.getId(), description);
522         this.network = network;
523         this.xml = xml;
524     }
525 
526     /** {@inheritDoc} */
527     @Override
528     public void constructModel() throws SimRuntimeException
529     {
530         try
531         {
532             XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
533                     false);
534             ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
535                     getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)),
536                     new LaneCombinationList(), new LaneCombinationList());
537         }
538         catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
539                 | ParserConfigurationException | GTUException | IOException | TrafficControlException exception)
540         {
541             exception.printStackTrace();
542             // Abusing the SimRuntimeException to propagate the message to the main method (the problem could actually be a
543             // parsing problem)
544             throw new SimRuntimeException(exception);
545         }
546     }
547 
548     /** {@inheritDoc} */
549     @Override
550     public OTSNetwork getNetwork()
551     {
552         return this.network;
553     }
554 
555     /** {@inheritDoc} */
556     @Override
557     public Serializable getSourceId()
558     {
559         return "Sim0MQPublisherModel";
560     }
561 
562 }