View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.awt.BorderLayout;
4   import java.awt.Container;
5   import java.awt.Dimension;
6   import java.awt.Frame;
7   import java.awt.event.ActionEvent;
8   import java.io.ByteArrayInputStream;
9   import java.io.IOException;
10  import java.io.Serializable;
11  import java.net.URISyntaxException;
12  import java.nio.charset.StandardCharsets;
13  import java.rmi.RemoteException;
14  import java.util.Arrays;
15  import java.util.HashMap;
16  import java.util.Iterator;
17  import java.util.LinkedHashMap;
18  import java.util.Map;
19  import java.util.concurrent.atomic.AtomicInteger;
20  
21  import javax.swing.JFrame;
22  import javax.swing.JPanel;
23  import javax.swing.JScrollPane;
24  import javax.xml.bind.JAXBException;
25  import javax.xml.parsers.ParserConfigurationException;
26  
27  import org.djunits.value.vdouble.scalar.Duration;
28  import org.djunits.value.vdouble.scalar.Length;
29  import org.djunits.value.vdouble.scalar.Time;
30  import org.djutils.decoderdumper.HexDumper;
31  import org.djutils.immutablecollections.ImmutableMap;
32  import org.djutils.serialization.SerializationException;
33  import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
34  import org.opentrafficsim.core.dsol.AbstractOTSModel;
35  import org.opentrafficsim.core.dsol.OTSAnimator;
36  import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
37  import org.opentrafficsim.core.geometry.OTSGeometryException;
38  import org.opentrafficsim.core.gtu.GTUException;
39  import org.opentrafficsim.core.gtu.GTUType;
40  import org.opentrafficsim.core.network.NetworkException;
41  import org.opentrafficsim.core.network.OTSNetwork;
42  import org.opentrafficsim.core.object.InvisibleObjectInterface;
43  import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
44  import org.opentrafficsim.road.network.OTSRoadNetwork;
45  import org.opentrafficsim.road.network.factory.xml.XmlParserException;
46  import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
47  import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
48  import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
49  import org.opentrafficsim.sim0mq.publisher.Publisher;
50  import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
51  import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
52  import org.opentrafficsim.swing.gui.OTSAnimationPanel;
53  import org.opentrafficsim.swing.gui.OTSSimulationApplication;
54  import org.opentrafficsim.swing.gui.OTSSwingApplication;
55  import org.opentrafficsim.trafficcontrol.TrafficControlException;
56  import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
57  import org.sim0mq.Sim0MQException;
58  import org.sim0mq.message.Sim0MQMessage;
59  import org.xml.sax.SAXException;
60  import org.zeromq.SocketType;
61  import org.zeromq.ZContext;
62  import org.zeromq.ZMQ;
63  
64  import nl.tudelft.simulation.dsol.SimRuntimeException;
65  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
66  import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
67  import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
68  import nl.tudelft.simulation.jstats.streams.MersenneTwister;
69  import nl.tudelft.simulation.jstats.streams.StreamInterface;
70  
71  /**
72   * Sim0MQPublisher - make many OTS simulation controls and observations available over Sim0MQ.
73   * <p>
74   * Copyright (c) 2020-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
75   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
76   * <p>
77   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
78   * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
79   */
80  public final class Sim0MQPublisher
81  {
82      /** The publisher. */
83      private Publisher publisher = null;
84  
85      /** The ZContect. */
86      private final ZContext zContext;
87  
88      /** The simulation model. */
89      private Sim0MQOTSModel model = null;
90  
91      /** The OTS road network. */
92      private OTSRoadNetwork network = null;
93  
94      /** The OTS animation panel. */
95      private OTSAnimationPanel animationPanel = null;
96  
97      /**
98       * Create a new Sim0MQPublisher that is operated through //inproc sockets.
99       * @param zContext ZContext; needed to create the sockets
100      * @param controlInput String; PULL socket for control input
101      * @param resultOutput String; PUSH socket to output results
102      */
103     public Sim0MQPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
104     {
105         this.zContext = zContext;
106         ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
107         controlSocket.bind("inproc://" + controlInput);
108         ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
109         resultOutputQueue.connect("inproc://" + resultOutput);
110         pollingLoop(controlSocket, resultOutputQueue);
111     }
112 
113     /**
114      * Create a new Sim0MQPublisher that uses TCP transport.
115      * @param port int; port number to bind to
116      */
117     public Sim0MQPublisher(final int port)
118     {
119         this.zContext = new ZContext(5);
120         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
121         socket.bind("tcp://*:" + port);
122         pollingLoop(socket, socket);
123     }
124 
125     /**
126      * Poller that receives the commands and ensures that various output sources can talk to the master.
127      * @param controlSocket ZMQ.Socket; PULL socket for commands from the master
128      * @param resultOutputQueue ZMQ.Socket; PULL socket for output that must be relayed to the master
129      */
130     private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
131     {
132         System.out
133                 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
134         resultOutputQueue.setHWM(100000);
135         AtomicInteger packetsSent = new AtomicInteger(0);
136         Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
137         ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
138         resultInputQueue.bind("inproc://simulationEvents");
139         // Poll the two input sockets using ZMQ poller
140         ZMQ.Poller poller = this.zContext.createPoller(2);
141         // TODO ensure that this also handles a closed control socket gracefully
142         poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
143         poller.register(controlSocket, ZMQ.Poller.POLLIN);
144         while (!Thread.currentThread().isInterrupted())
145         {
146             // System.out.println("Publisher calls Poller.poll()");
147             poller.poll();
148             if (poller.pollin(0))
149             {
150                 byte[] data = resultInputQueue.recv();
151                 // System.out.println("Publisher got outgoing result of " + data.length + " bytes");
152                 byte[] fixedData = data;
153                 int number = -1;
154                 try
155                 {
156                     // Patch the sender field to include the packet counter value - this is bloody expensive...
157                     Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
158                     Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
159                     number = packetsSent.addAndGet(1);
160                     fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
161                             messageFields[4], messageFields[5], messageFields[6], newMessageFields);
162                     // System.out
163                     // .println("Prepared message " + number + ", type is \"" + messageFields[5] + "\", " + messageFields[6]);
164                 }
165                 catch (Sim0MQException | SerializationException e)
166                 {
167                     e.printStackTrace();
168                 }
169                 resultOutputQueue.send(fixedData, 0);
170                 // System.out.println("Outgoing result handed over to controlSocket");
171                 continue; // Check for more results before checking the control input
172             }
173             if (poller.pollin(1))
174             {
175                 byte[] data = controlSocket.recv();
176                 // System.out.println("Publisher received a command of " + data.length + " bytes");
177                 if (!handleCommand(data, socketMap))
178                 {
179                     break;
180                 }
181             }
182         }
183         System.out.println("Exiting publisher polling loop");
184     }
185 
186     /**
187      * Construct an OTS simulation experiment from an XML description.
188      * @param xml String; the XML encoded network
189      * @param simulationDuration Duration; total duration of the simulation
190      * @param warmupTime Duration; warm up time of the simulation
191      * @param seed Long; seed for the experiment
192      * @return String; null on success, description of the problem on error
193      */
194     private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
195     {
196         try
197         {
198             OTSAnimator animator = new OTSAnimator("OTS Animator");
199             this.network = new OTSRoadNetwork("OTS model for Sim0MQPublisher", true, animator);
200             this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, xml);
201             Map<String, StreamInterface> map = new LinkedHashMap<>();
202             map.put("generation", new MersenneTwister(seed));
203             animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
204             this.publisher = new Publisher(this.network);
205             this.animationPanel = new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
206                     animator, this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
207             new OTSSimulationApplication<Sim0MQOTSModel>(this.model, this.animationPanel);
208             DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
209             JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
210             frame.setExtendedState(Frame.NORMAL);
211             frame.setSize(new Dimension(1100, 1000));
212             frame.setBounds(0, 25, 1100, 1000);
213             animator.setSpeedFactor(Double.MAX_VALUE, true);
214             animator.setSpeedFactor(1000.0, true);
215 
216             ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap = this.model.getNetwork().getInvisibleObjectMap();
217             for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
218             {
219                 if (ioi instanceof TrafCOD)
220                 {
221                     TrafCOD trafCOD = (TrafCOD) ioi;
222                     Container controllerDisplayPanel = trafCOD.getDisplayContainer();
223                     if (null != controllerDisplayPanel)
224                     {
225                         JPanel wrapper = new JPanel(new BorderLayout());
226                         wrapper.add(new JScrollPane(controllerDisplayPanel));
227                         TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
228                         tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
229                     }
230                 }
231             }
232             try
233             {
234                 Thread.sleep(300);
235             }
236             catch (InterruptedException e)
237             {
238                 e.printStackTrace();
239             }
240             this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
241         }
242         catch (Exception e)
243         {
244             return e.getMessage();
245         }
246         return null;
247     }
248 
249     /**
250      * Execute one remote control command.
251      * @param data byte[]; the SIM0MQ encoded command
252      * @param socketMap Map&lt;Long, ZMQ.Socket&gt;; cache of created sockets for returned messages
253      * @return boolean; true if another command can be processed after this one; false when no further commands can be processed
254      */
255     private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
256     {
257         boolean result = true;
258         try
259         {
260             Object[] message = Sim0MQMessage.decode(data).createObjectArray();
261             String resultMessage = "OK";
262             Boolean ackNack = null;
263 
264             if (message.length >= 8 && message[5] instanceof String)
265             {
266                 String command = (String) message[5];
267                 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
268 
269                 String[] parts = command.split("\\|");
270                 if (parts.length == 2)
271                 {
272                     // This is a command for the embedded Publisher
273                     ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
274                             new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
275                             socketMap);
276                     if (null == this.publisher)
277                     {
278                         returnWrapper.nack("No simulation loaded; cannot execute command " + command);
279                         System.err.println("No publisher for command " + command);
280                         return true;
281                     }
282                     Object[] payload = Arrays.copyOfRange(message, 8, message.length);
283                     this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
284                     return true;
285                 }
286                 else
287                 {
288                     switch (command)
289                     {
290                         case "NEWSIMULATION":
291                             if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
292                                     && message[10] instanceof Duration && message[11] instanceof Long)
293                             {
294                                 if (null != this.animationPanel)
295                                 {
296                                     for (Container container = this.animationPanel; container != null; container =
297                                             container.getParent())
298                                     {
299                                         if (container instanceof JFrame)
300                                         {
301                                             JFrame jFrame = (JFrame) container;
302                                             jFrame.dispose();
303                                         }
304                                     }
305                                 }
306                                 // System.out.println("xml length = " + ((String) message[8]).length());
307                                 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
308                                         (Long) message[11]);
309                                 ackNack = null == resultMessage;
310                                 if (ackNack)
311                                 {
312                                     resultMessage = "OK";
313                                 }
314                             }
315                             else
316                             {
317                                 resultMessage =
318                                         "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
319                                 ackNack = false;
320                             }
321                             break;
322 
323                         case "DIE":
324                             for (Container container = this.animationPanel; container != null; container =
325                                     container.getParent())
326                             {
327                                 // System.out.println("container is " + container);
328                                 if (container instanceof JFrame)
329                                 {
330                                     JFrame jFrame = (JFrame) container;
331                                     jFrame.dispose();
332                                 }
333                             }
334                             return false;
335 
336                         case "SIMULATEUNTIL":
337                             if (message.length == 9 && message[8] instanceof Time)
338                             {
339                                 System.out.println("Simulating up to " + message[8]);
340                                 if (null == this.network)
341                                 {
342                                     resultMessage = "No network loaded";
343                                     ackNack = false;
344                                     break;
345                                 }
346                                 OTSSimulatorInterface simulator = this.network.getSimulator();
347                                 if (simulator.getSimulatorTime()
348                                         .ge(simulator.getReplication().getExperiment().getTreatment().getEndTime()))
349                                 {
350                                     resultMessage = "Simulation is already at end of simulation time";
351                                     ackNack = false;
352                                     break;
353                                 }
354                                 if (simulator.isStartingOrRunning())
355                                 {
356                                     resultMessage = "Simulator is already running"; // cannot happen for now
357                                     ackNack = false;
358                                     break;
359                                 }
360                                 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
361                                         message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
362                                 returnWrapper.ack(resultMessage);
363                                 simulator.runUpTo((Time) message[8]);
364                                 int count = 0;
365                                 while (this.network.getSimulator().isStartingOrRunning())
366                                 {
367                                     System.out.print(".");
368                                     count++;
369                                     if (count > 1000) // Quit after 1000 attempts of 10 ms; 10 s
370                                     {
371                                         System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
372                                                 + this.network.getSimulator().getSimulatorTime());
373                                         this.network.getSimulator().stop();
374                                         Iterator<SimEventInterface<SimTimeDoubleUnit>> elIt =
375                                                 this.network.getSimulator().getEventList().iterator();
376                                         while (elIt.hasNext())
377                                         {
378                                             System.out.println("EVENTLIST: " + elIt.next());
379                                         }
380                                     }
381                                     try
382                                     {
383                                         Thread.sleep(10); // 10 ms
384                                     }
385                                     catch (InterruptedException e)
386                                     {
387                                         e.printStackTrace();
388                                     }
389                                 }
390                                 // TODO Fix this (it is still needed - 2020-06-16)
391                                 try
392                                 {
393                                     Thread.sleep(100); // EXTRA STOP FOR SYNC REASONS - BUG IN DSOL!
394                                 }
395                                 catch (InterruptedException e)
396                                 {
397                                     e.printStackTrace();
398                                 }
399                                 return true; // ack has been sent when simulation started
400                             }
401                             else
402                             {
403                                 resultMessage = "Bad or missing stop time";
404                                 ackNack = false;
405                             }
406                             break;
407 
408                         default:
409                             resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
410                             ackNack = false;
411                             break;
412                     }
413                 }
414                 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
415                         new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
416                 if (ackNack)
417                 {
418                     returnWrapper.ack(resultMessage);
419                 }
420                 else
421                 {
422                     returnWrapper.nack(resultMessage);
423                 }
424             }
425             else
426             {
427                 // We cannot construct a ReturnWrapper because the request has too few fields.
428                 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
429                 System.err.println(HexDumper.hexDumper(data));
430                 return true; // Do we really want to try again?
431             }
432         }
433         catch (Sim0MQException | SerializationException | RemoteException e)
434         {
435             System.err.println("Publisher thread could not decode command");
436             e.printStackTrace();
437         }
438         return result;
439     }
440 
441 }
442 
443 /**
444  * The Model.
445  */
446 class Sim0MQOTSModel extends AbstractOTSModel
447 {
448     /** */
449     private static final long serialVersionUID = 20170419L;
450 
451     /** The network. */
452     private final OTSRoadNetwork network;
453 
454     /** The XML. */
455     private final String xml;
456 
457     /**
458      * @param description String; the model description
459      * @param network OTSRoadNetwork; the network
460      * @param xml String; the XML description of the simulation model
461      */
462     Sim0MQOTSModel(final String description, final OTSRoadNetwork network, final String xml)
463     {
464         super(network.getSimulator(), network.getId(), description);
465         this.network = network;
466         this.xml = xml;
467     }
468 
469     /** {@inheritDoc} */
470     @Override
471     public void constructModel() throws SimRuntimeException
472     {
473         try
474         {
475             XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
476                     false);
477             ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
478                     getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)),
479                     new LaneCombinationList(), new LaneCombinationList());
480         }
481         catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
482                 | ParserConfigurationException | GTUException | IOException | TrafficControlException exception)
483         {
484             exception.printStackTrace();
485             // Abusing the SimRuntimeException to propagate the message to the main method (the problem could actually be a
486             // parsing problem)
487             throw new SimRuntimeException(exception);
488         }
489     }
490 
491     /** {@inheritDoc} */
492     @Override
493     public OTSNetwork getNetwork()
494     {
495         return this.network;
496     }
497 
498     /** {@inheritDoc} */
499     @Override
500     public Serializable getSourceId()
501     {
502         return "Sim0MQPublisherModel";
503     }
504 
505 }