View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.awt.BorderLayout;
4   import java.awt.Container;
5   import java.awt.Dimension;
6   import java.awt.Frame;
7   import java.awt.event.ActionEvent;
8   import java.io.ByteArrayInputStream;
9   import java.io.IOException;
10  import java.net.URISyntaxException;
11  import java.nio.charset.StandardCharsets;
12  import java.rmi.RemoteException;
13  import java.util.Arrays;
14  import java.util.HashMap;
15  import java.util.Iterator;
16  import java.util.LinkedHashMap;
17  import java.util.List;
18  import java.util.Map;
19  import java.util.concurrent.atomic.AtomicInteger;
20  
21  import javax.swing.JFrame;
22  import javax.swing.JPanel;
23  import javax.swing.JScrollPane;
24  import javax.xml.bind.JAXBException;
25  import javax.xml.parsers.ParserConfigurationException;
26  
27  import org.djunits.value.vdouble.scalar.Duration;
28  import org.djunits.value.vdouble.scalar.Length;
29  import org.djunits.value.vdouble.scalar.Time;
30  import org.djutils.decoderdumper.HexDumper;
31  import org.djutils.immutablecollections.ImmutableMap;
32  import org.djutils.serialization.SerializationException;
33  import org.opentrafficsim.animation.DefaultAnimationFactory;
34  import org.opentrafficsim.animation.gtu.colorer.DefaultSwitchableGtuColorer;
35  import org.opentrafficsim.core.dsol.AbstractOtsModel;
36  import org.opentrafficsim.core.dsol.OtsAnimator;
37  import org.opentrafficsim.core.dsol.OtsSimulatorInterface;
38  import org.opentrafficsim.core.geometry.OtsGeometryException;
39  import org.opentrafficsim.core.gtu.GtuException;
40  import org.opentrafficsim.core.network.Network;
41  import org.opentrafficsim.core.network.NetworkException;
42  import org.opentrafficsim.core.object.NonLocatedObject;
43  import org.opentrafficsim.road.network.RoadNetwork;
44  import org.opentrafficsim.road.network.factory.xml.XmlParserException;
45  import org.opentrafficsim.road.network.factory.xml.parser.XmlParser;
46  import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
47  import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
48  import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
49  import org.opentrafficsim.sim0mq.publisher.Publisher;
50  import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
51  import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
52  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
53  import org.opentrafficsim.swing.gui.OtsAnimationPanel;
54  import org.opentrafficsim.swing.gui.OtsSimulationApplication;
55  import org.opentrafficsim.swing.gui.OtsSwingApplication;
56  import org.opentrafficsim.swing.script.AbstractSimulationScript;
57  import org.opentrafficsim.trafficcontrol.TrafficControlException;
58  import org.opentrafficsim.trafficcontrol.trafcod.TrafCod;
59  import org.sim0mq.Sim0MQException;
60  import org.sim0mq.message.Sim0MQMessage;
61  import org.xml.sax.SAXException;
62  import org.zeromq.SocketType;
63  import org.zeromq.ZContext;
64  import org.zeromq.ZMQ;
65  
66  import nl.tudelft.simulation.dsol.SimRuntimeException;
67  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
68  import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
69  import nl.tudelft.simulation.jstats.streams.MersenneTwister;
70  import nl.tudelft.simulation.jstats.streams.StreamInterface;
71  
72  /**
73   * Sim0MQPublisher - make many OTS simulation controls and observations available over Sim0MQ.
74   * <p>
75   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
76   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
77   * </p>
78   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
79   * @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
80   */
81  public final class Sim0mqPublisher
82  {
83      /** The publisher. */
84      private Publisher publisher = null;
85  
86      /** The ZContect. */
87      private final ZContext zContext;
88  
89      /** The simulation model. */
90      private Sim0mqOtsModel model = null;
91  
92      /** The OTS road network. */
93      private RoadNetwork network = null;
94  
95      /** The OTS animation panel. */
96      private OtsAnimationPanel animationPanel = null;
97  
98      /**
99       * Create a new Sim0MQPublisher that is operated through //inproc sockets.
100      * @param zContext ZContext; needed to create the sockets
101      * @param controlInput String; PULL socket for control input
102      * @param resultOutput String; PUSH socket to output results
103      */
104     public Sim0mqPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
105     {
106         this.zContext = zContext;
107         ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
108         controlSocket.bind("inproc://" + controlInput);
109         ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
110         resultOutputQueue.connect("inproc://" + resultOutput);
111         pollingLoop(controlSocket, resultOutputQueue);
112     }
113 
114     /**
115      * Create a new Sim0MQPublisher that uses TCP transport.
116      * @param port int; port number to bind to
117      */
118     public Sim0mqPublisher(final int port)
119     {
120         this.zContext = new ZContext(5);
121         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
122         socket.bind("tcp://*:" + port);
123         pollingLoop(socket, socket);
124     }
125 
126     /**
127      * Create a new Sim0MQPublisher that uses TCP transport.
128      * @param port int; port number to bind to
129      * @param preloadedSimulation AbstractSimulationScript; a fully loaded (but not started) simulation
130      * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers (may be
131      *            null)
132      * @param incomingDataHandlers List&lt;IncomfingDataHandler&gt;; list of additional handlers for incoming data that is not
133      *            handled by the standard Sim0MQPublisher (may be null)
134      * @throws RemoteException when construction of the Publisher failed
135      */
136     public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
137             final List<SubscriptionHandler> additionalSubscriptionHandlers,
138             final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
139     {
140         this.zContext = new ZContext(5);
141         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
142         socket.bind("tcp://*:" + port);
143         this.network = preloadedSimulation.getNetwork();
144         this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, null);
145         this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
146         ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
147         ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
148         pollingLoop(socket, socket);
149         System.exit(0);
150     }
151 
152     /**
153      * Create a new Sim0MQPublisher that uses TCP transport.
154      * @param port int; port number to bind to
155      * @param preloadedSimulation AbstractSimulationScript; a fully loaded (but not started) simulation
156      * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers (may be
157      *            null)
158      * @throws RemoteException when construction of the Publisher failed
159      */
160     public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
161             final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
162     {
163         this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
164     }
165 
166     /**
167      * Poller that receives the commands and ensures that various output sources can talk to the master.
168      * @param controlSocket ZMQ.Socket; PULL socket for commands from the master
169      * @param resultOutputQueue ZMQ.Socket; PULL socket for output that must be relayed to the master
170      */
171     private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
172     {
173         System.out
174                 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
175         resultOutputQueue.setHWM(100000);
176         AtomicInteger packetsSent = new AtomicInteger(0);
177         Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
178         ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
179         resultInputQueue.bind("inproc://simulationEvents");
180         // Poll the two input sockets using ZMQ poller
181         ZMQ.Poller poller = this.zContext.createPoller(2);
182         // TODO ensure that this also handles a closed control socket gracefully
183         poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
184         poller.register(controlSocket, ZMQ.Poller.POLLIN);
185         while (!Thread.currentThread().isInterrupted())
186         {
187             // System.out.println("Publisher calls Poller.poll()");
188             poller.poll();
189             if (poller.pollin(0))
190             {
191                 byte[] data = resultInputQueue.recv();
192                 // System.out.println("Publisher got outgoing result of " + data.length + " bytes");
193                 byte[] fixedData = data;
194                 int number = -1;
195                 try
196                 {
197                     // Patch the sender field to include the packet counter value - this is bloody expensive...
198                     Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
199                     Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
200                     number = packetsSent.addAndGet(1);
201                     fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
202                             messageFields[4], messageFields[5], messageFields[6], newMessageFields);
203                     // System.out
204                     // .println("Prepared message " + number + ", type is \"" + messageFields[5] + "\", " + messageFields[6]);
205                 }
206                 catch (Sim0MQException | SerializationException e)
207                 {
208                     e.printStackTrace();
209                 }
210                 resultOutputQueue.send(fixedData, 0);
211                 // System.out.println("Outgoing result handed over to controlSocket");
212                 continue; // Check for more results before checking the control input
213             }
214             if (poller.pollin(1))
215             {
216                 byte[] data = controlSocket.recv();
217                 // System.out.println("Publisher received a command of " + data.length + " bytes");
218                 if (!handleCommand(data, socketMap))
219                 {
220                     break;
221                 }
222             }
223         }
224         System.out.println("Exiting publisher polling loop");
225     }
226 
227     /**
228      * Construct an OTS simulation experiment from an XML description.
229      * @param xml String; the XML encoded network
230      * @param simulationDuration Duration; total duration of the simulation
231      * @param warmupTime Duration; warm up time of the simulation
232      * @param seed Long; seed for the experiment
233      * @return String; null on success, description of the problem on error
234      */
235     private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
236     {
237         try
238         {
239             OtsAnimator animator = new OtsAnimator("OTS Animator");
240             this.network = new RoadNetwork("OTS model for Sim0MQPublisher", animator);
241             this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, xml);
242             Map<String, StreamInterface> map = new LinkedHashMap<>();
243             map.put("generation", new MersenneTwister(seed));
244             animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
245             this.publisher = new Publisher(this.network);
246             this.animationPanel = new OtsAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
247                     animator, this.model, OtsSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
248             new OtsSimulationApplication<Sim0mqOtsModel>(this.model, this.animationPanel);
249             // TODO: remove these hard-coded default GTU types; delegate types to Stripe and GtuType itself
250             DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGtuColorer());
251             JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
252             frame.setExtendedState(Frame.NORMAL);
253             frame.setSize(new Dimension(1100, 1000));
254             frame.setBounds(0, 25, 1100, 1000);
255             animator.setSpeedFactor(Double.MAX_VALUE, true);
256             animator.setSpeedFactor(1000.0, true);
257 
258             ImmutableMap<String, NonLocatedObject> onLocatedObjectMap = this.model.getNetwork().getNonLocatedObjectMap();
259             for (NonLocatedObject ioi : onLocatedObjectMap.values())
260             {
261                 if (ioi instanceof TrafCod)
262                 {
263                     TrafCod trafCOD = (TrafCod) ioi;
264                     Container controllerDisplayPanel = trafCOD.getDisplayContainer();
265                     if (null != controllerDisplayPanel)
266                     {
267                         JPanel wrapper = new JPanel(new BorderLayout());
268                         wrapper.add(new JScrollPane(controllerDisplayPanel));
269                         TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
270                         tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
271                     }
272                 }
273             }
274             try
275             {
276                 Thread.sleep(300);
277             }
278             catch (InterruptedException e)
279             {
280                 e.printStackTrace();
281             }
282             this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
283         }
284         catch (Exception e)
285         {
286             return e.getMessage();
287         }
288         return null;
289     }
290 
291     /**
292      * Execute one remote control command.
293      * @param data byte[]; the SIM0MQ encoded command
294      * @param socketMap Map&lt;Long, ZMQ.Socket&gt;; cache of created sockets for returned messages
295      * @return boolean; true if another command can be processed after this one; false when no further commands can be processed
296      */
297     @SuppressWarnings("checkstyle:methodlength")
298     private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
299     {
300         boolean result = true;
301         try
302         {
303             Object[] message = Sim0MQMessage.decode(data).createObjectArray();
304             String resultMessage = "OK";
305             Boolean ackNack = null;
306 
307             if (message.length >= 8 && message[5] instanceof String)
308             {
309                 String command = (String) message[5];
310                 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
311 
312                 String[] parts = command.split("\\|");
313                 if (parts.length == 2)
314                 {
315                     // This is a command for the embedded Publisher
316                     ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
317                             new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
318                             socketMap);
319                     if (null == this.publisher)
320                     {
321                         returnWrapper.nack("No simulation loaded; cannot execute command " + command);
322                         System.err.println("No publisher for command " + command);
323                         return true;
324                     }
325                     Object[] payload = Arrays.copyOfRange(message, 8, message.length);
326                     this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
327                     return true;
328                 }
329                 else
330                 {
331                     switch (command)
332                     {
333                         case "NEWSIMULATION":
334                             if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
335                                     && message[10] instanceof Duration && message[11] instanceof Long)
336                             {
337                                 if (null != this.animationPanel)
338                                 {
339                                     for (Container container = this.animationPanel; container != null; container =
340                                             container.getParent())
341                                     {
342                                         if (container instanceof JFrame)
343                                         {
344                                             JFrame jFrame = (JFrame) container;
345                                             jFrame.dispose();
346                                         }
347                                     }
348                                 }
349                                 // System.out.println("xml length = " + ((String) message[8]).length());
350                                 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
351                                         (Long) message[11]);
352                                 ackNack = null == resultMessage;
353                                 if (ackNack)
354                                 {
355                                     resultMessage = "OK";
356                                 }
357                             }
358                             else
359                             {
360                                 resultMessage =
361                                         "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
362                                 ackNack = false;
363                             }
364                             break;
365 
366                         case "DIE":
367                             for (Container container = this.animationPanel; container != null; container =
368                                     container.getParent())
369                             {
370                                 // System.out.println("container is " + container);
371                                 if (container instanceof JFrame)
372                                 {
373                                     JFrame jFrame = (JFrame) container;
374                                     jFrame.dispose();
375                                 }
376                             }
377                             return false;
378 
379                         case "SIMULATEUNTIL":
380                             if (message.length == 9 && message[8] instanceof Time)
381                             {
382                                 System.out.println("Simulating up to " + message[8]);
383                                 if (null == this.network)
384                                 {
385                                     resultMessage = "No network loaded";
386                                     ackNack = false;
387                                     break;
388                                 }
389                                 OtsSimulatorInterface simulator = this.network.getSimulator();
390                                 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
391                                 {
392                                     resultMessage = "Simulation is already at end of simulation time";
393                                     ackNack = false;
394                                     break;
395                                 }
396                                 if (simulator.isStartingOrRunning())
397                                 {
398                                     resultMessage = "Simulator is already running"; // cannot happen for now
399                                     ackNack = false;
400                                     break;
401                                 }
402                                 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
403                                         message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
404                                 returnWrapper.ack(resultMessage);
405                                 simulator.runUpTo((Time) message[8]);
406                                 int count = 0;
407                                 while (this.network.getSimulator().isStartingOrRunning())
408                                 {
409                                     System.out.print(".");
410                                     count++;
411                                     if (count > 1000) // Quit after 1000 attempts of 10 ms; 10 s
412                                     {
413                                         System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
414                                                 + this.network.getSimulator().getSimulatorTime());
415                                         this.network.getSimulator().stop();
416                                         Iterator<SimEventInterface<Duration>> elIt =
417                                                 this.network.getSimulator().getEventList().iterator();
418                                         while (elIt.hasNext())
419                                         {
420                                             System.out.println("EVENTLIST: " + elIt.next());
421                                         }
422                                     }
423                                     try
424                                     {
425                                         Thread.sleep(10); // 10 ms
426                                     }
427                                     catch (InterruptedException e)
428                                     {
429                                         e.printStackTrace();
430                                     }
431                                 }
432                                 // TODO Fix this (it is still needed - 2020-06-16)
433                                 try
434                                 {
435                                     Thread.sleep(100); // EXTRA STOP FOR SYNC REASONS - BUG IN DSOL!
436                                 }
437                                 catch (InterruptedException e)
438                                 {
439                                     e.printStackTrace();
440                                 }
441                                 return true; // ack has been sent when simulation started
442                             }
443                             else
444                             {
445                                 resultMessage = "Bad or missing stop time";
446                                 ackNack = false;
447                             }
448                             break;
449 
450                         default:
451                             IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
452                             if (incomingDataHandler != null)
453                             {
454                                 resultMessage = incomingDataHandler.handleIncomingData(message);
455                                 ackNack = resultMessage == null;
456                             }
457                             else
458                             {
459                                 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
460                                 ackNack = false;
461                             }
462                             break;
463                     }
464                 }
465                 if (resultMessage != null)
466                 {
467                     ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
468                             new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
469                             socketMap);
470                     if (ackNack)
471                     {
472                         returnWrapper.ack(resultMessage);
473                     }
474                     else
475                     {
476                         returnWrapper.nack(resultMessage);
477                     }
478                 }
479             }
480             else
481             {
482                 // We cannot construct a ReturnWrapper because the request has too few fields.
483                 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
484                 System.err.println(HexDumper.hexDumper(data));
485                 return true; // Do we really want to try again?
486             }
487         }
488         catch (Sim0MQException | SerializationException | RemoteException e)
489         {
490             System.err.println("Publisher thread could not decode command");
491             e.printStackTrace();
492         }
493         return result;
494     }
495 
496 }
497 
498 /**
499  * The Model.
500  */
501 class Sim0mqOtsModel extends AbstractOtsModel
502 {
503     /** */
504     private static final long serialVersionUID = 20170419L;
505 
506     /** The network. */
507     private final RoadNetwork network;
508 
509     /** The XML. */
510     private final String xml;
511 
512     /**
513      * @param description String; the model description
514      * @param network RoadNetwork; the network
515      * @param xml String; the XML description of the simulation model
516      */
517     Sim0mqOtsModel(final String description, final RoadNetwork network, final String xml)
518     {
519         super(network.getSimulator(), network.getId(), description);
520         this.network = network;
521         this.xml = xml;
522     }
523 
524     /** {@inheritDoc} */
525     @Override
526     public void constructModel() throws SimRuntimeException
527     {
528         try
529         {
530             new XmlParser(this.network).setStream(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8))).build();
531             // TODO: obtain relevant GTU type from xml
532             ConflictBuilder.buildConflictsParallel(this.network, getSimulator(),
533                     new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)), new LaneCombinationList(),
534                     new LaneCombinationList());
535         }
536         catch (NetworkException | OtsGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
537                 | ParserConfigurationException | GtuException | IOException | TrafficControlException exception)
538         {
539             exception.printStackTrace();
540             // Abusing the SimRuntimeException to propagate the message to the main method (the problem could actually be a
541             // parsing problem)
542             throw new SimRuntimeException(exception);
543         }
544     }
545 
546     /** {@inheritDoc} */
547     @Override
548     public Network getNetwork()
549     {
550         return this.network;
551     }
552 
553 }