View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.awt.BorderLayout;
4   import java.awt.Container;
5   import java.awt.Dimension;
6   import java.awt.Frame;
7   import java.awt.event.ActionEvent;
8   import java.io.ByteArrayInputStream;
9   import java.io.IOException;
10  import java.net.URISyntaxException;
11  import java.nio.charset.StandardCharsets;
12  import java.rmi.RemoteException;
13  import java.util.Arrays;
14  import java.util.Collections;
15  import java.util.HashMap;
16  import java.util.Iterator;
17  import java.util.LinkedHashMap;
18  import java.util.List;
19  import java.util.Map;
20  import java.util.concurrent.atomic.AtomicInteger;
21  
22  import javax.swing.JFrame;
23  import javax.swing.JPanel;
24  import javax.swing.JScrollPane;
25  import javax.xml.parsers.ParserConfigurationException;
26  
27  import org.djunits.value.vdouble.scalar.Duration;
28  import org.djunits.value.vdouble.scalar.Length;
29  import org.djunits.value.vdouble.scalar.Time;
30  import org.djutils.decoderdumper.HexDumper;
31  import org.djutils.immutablecollections.ImmutableMap;
32  import org.djutils.serialization.SerializationException;
33  import org.opentrafficsim.core.dsol.AbstractOtsModel;
34  import org.opentrafficsim.core.dsol.OtsAnimator;
35  import org.opentrafficsim.core.dsol.OtsSimulatorInterface;
36  import org.opentrafficsim.core.gtu.GtuException;
37  import org.opentrafficsim.core.network.Network;
38  import org.opentrafficsim.core.network.NetworkException;
39  import org.opentrafficsim.core.object.NonLocatedObject;
40  import org.opentrafficsim.core.perception.HistoryManagerDevs;
41  import org.opentrafficsim.road.network.RoadNetwork;
42  import org.opentrafficsim.road.network.factory.xml.XmlParserException;
43  import org.opentrafficsim.road.network.factory.xml.parser.XmlParser;
44  import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
45  import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
46  import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
47  import org.opentrafficsim.sim0mq.publisher.Publisher;
48  import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
49  import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
50  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
51  import org.opentrafficsim.swing.gui.OtsAnimationPanel;
52  import org.opentrafficsim.swing.gui.OtsSimulationApplication;
53  import org.opentrafficsim.swing.gui.OtsSwingApplication;
54  import org.opentrafficsim.swing.script.AbstractSimulationScript;
55  import org.opentrafficsim.trafficcontrol.TrafficControlException;
56  import org.opentrafficsim.trafficcontrol.trafcod.TrafCod;
57  import org.sim0mq.Sim0MQException;
58  import org.sim0mq.message.Sim0MQMessage;
59  import org.xml.sax.SAXException;
60  import org.zeromq.SocketType;
61  import org.zeromq.ZContext;
62  import org.zeromq.ZMQ;
63  
64  import jakarta.xml.bind.JAXBException;
65  import nl.tudelft.simulation.dsol.SimRuntimeException;
66  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
67  import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
68  import nl.tudelft.simulation.jstats.streams.MersenneTwister;
69  import nl.tudelft.simulation.jstats.streams.StreamInterface;
70  
71  /**
72   * Sim0MQPublisher - make many OTS simulation controls and observations available over Sim0MQ.
73   * <p>
74   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
75   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
76   * </p>
77   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
78   * @author <a href="https://github.com/peter-knoppers">Peter Knoppers</a>
79   */
80  public final class Sim0mqPublisher
81  {
82      /** The publisher. */
83      private Publisher publisher = null;
84  
85      /** The ZContect. */
86      private final ZContext zContext;
87  
88      /** The simulation model. */
89      private Sim0mqOtsModel model = null;
90  
91      /** The OTS road network. */
92      private RoadNetwork network = null;
93  
94      /** The OTS animation panel. */
95      private OtsAnimationPanel animationPanel = null;
96  
97      /**
98       * Create a new Sim0MQPublisher that is operated through //inproc sockets.
99       * @param zContext needed to create the sockets
100      * @param controlInput PULL socket for control input
101      * @param resultOutput PUSH socket to output results
102      */
103     public Sim0mqPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
104     {
105         this.zContext = zContext;
106         ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
107         controlSocket.bind("inproc://" + controlInput);
108         ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
109         resultOutputQueue.connect("inproc://" + resultOutput);
110         pollingLoop(controlSocket, resultOutputQueue);
111     }
112 
113     /**
114      * Create a new Sim0MQPublisher that uses TCP transport.
115      * @param port port number to bind to
116      */
117     public Sim0mqPublisher(final int port)
118     {
119         this.zContext = new ZContext(5);
120         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
121         socket.bind("tcp://*:" + port);
122         pollingLoop(socket, socket);
123     }
124 
125     /**
126      * Create a new Sim0MQPublisher that uses TCP transport.
127      * @param port port number to bind to
128      * @param preloadedSimulation a fully loaded (but not started) simulation
129      * @param additionalSubscriptionHandlers list of additional subscription handlers (may be null)
130      * @param incomingDataHandlers list of additional handlers for incoming data that is not handled by the standard
131      *            Sim0MQPublisher (may be null)
132      * @throws RemoteException when construction of the Publisher failed
133      */
134     public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
135             final List<SubscriptionHandler> additionalSubscriptionHandlers,
136             final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
137     {
138         this.zContext = new ZContext(5);
139         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
140         socket.bind("tcp://*:" + port);
141         this.network = preloadedSimulation.getNetwork();
142         this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, null);
143         this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
144         ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
145         ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
146         pollingLoop(socket, socket);
147         System.exit(0);
148     }
149 
150     /**
151      * Create a new Sim0MQPublisher that uses TCP transport.
152      * @param port port number to bind to
153      * @param preloadedSimulation a fully loaded (but not started) simulation
154      * @param additionalSubscriptionHandlers list of additional subscription handlers (may be null)
155      * @throws RemoteException when construction of the Publisher failed
156      */
157     public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
158             final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
159     {
160         this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
161     }
162 
163     /**
164      * Poller that receives the commands and ensures that various output sources can talk to the master.
165      * @param controlSocket ZMQ.Socket; PULL socket for commands from the master
166      * @param resultOutputQueue ZMQ.Socket; PULL socket for output that must be relayed to the master
167      */
168     private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
169     {
170         System.out
171                 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
172         resultOutputQueue.setHWM(100000);
173         AtomicInteger packetsSent = new AtomicInteger(0);
174         Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
175         ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
176         resultInputQueue.bind("inproc://simulationEvents");
177         // Poll the two input sockets using ZMQ poller
178         ZMQ.Poller poller = this.zContext.createPoller(2);
179         // TODO ensure that this also handles a closed control socket gracefully
180         poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
181         poller.register(controlSocket, ZMQ.Poller.POLLIN);
182         while (!Thread.currentThread().isInterrupted())
183         {
184             // System.out.println("Publisher calls Poller.poll()");
185             poller.poll();
186             if (poller.pollin(0))
187             {
188                 byte[] data = resultInputQueue.recv();
189                 // System.out.println("Publisher got outgoing result of " + data.length + " bytes");
190                 byte[] fixedData = data;
191                 int number = -1;
192                 try
193                 {
194                     // Patch the sender field to include the packet counter value - this is bloody expensive...
195                     Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
196                     Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
197                     number = packetsSent.addAndGet(1);
198                     fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
199                             messageFields[4], messageFields[5], messageFields[6], newMessageFields);
200                     // System.out
201                     // .println("Prepared message " + number + ", type is \"" + messageFields[5] + "\", " + messageFields[6]);
202                 }
203                 catch (Sim0MQException | SerializationException e)
204                 {
205                     e.printStackTrace();
206                 }
207                 resultOutputQueue.send(fixedData, 0);
208                 // System.out.println("Outgoing result handed over to controlSocket");
209                 continue; // Check for more results before checking the control input
210             }
211             if (poller.pollin(1))
212             {
213                 byte[] data = controlSocket.recv();
214                 // System.out.println("Publisher received a command of " + data.length + " bytes");
215                 if (!handleCommand(data, socketMap))
216                 {
217                     break;
218                 }
219             }
220         }
221         System.out.println("Exiting publisher polling loop");
222     }
223 
224     /**
225      * Construct an OTS simulation experiment from an XML description.
226      * @param xml the XML encoded network
227      * @param simulationDuration total duration of the simulation
228      * @param warmupTime warm up time of the simulation
229      * @param seed seed for the experiment
230      * @return null on success, description of the problem on error
231      */
232     private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
233     {
234         try
235         {
236             OtsAnimator animator = new OtsAnimator("OTS Animator");
237             this.network = new RoadNetwork("OTS model for Sim0MQPublisher", animator);
238             this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, xml);
239             Map<String, StreamInterface> map = new LinkedHashMap<>();
240             map.put("generation", new MersenneTwister(seed));
241             animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map,
242                     HistoryManagerDevs.noHistory(animator));
243             this.publisher = new Publisher(this.network);
244             this.animationPanel = new OtsAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
245                     animator, this.model, OtsSwingApplication.DEFAULT_GTU_COLORERS, this.model.getNetwork());
246             new OtsSimulationApplication<Sim0mqOtsModel>(this.model, this.animationPanel, Collections.emptyMap());
247             JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
248             frame.setExtendedState(Frame.NORMAL);
249             frame.setSize(new Dimension(1100, 1000));
250             frame.setBounds(0, 25, 1100, 1000);
251             animator.setSpeedFactor(Double.MAX_VALUE, true);
252             animator.setSpeedFactor(1000.0, true);
253 
254             ImmutableMap<String, NonLocatedObject> onLocatedObjectMap = this.model.getNetwork().getNonLocatedObjectMap();
255             for (NonLocatedObject ioi : onLocatedObjectMap.values())
256             {
257                 if (ioi instanceof TrafCod)
258                 {
259                     TrafCod trafCOD = (TrafCod) ioi;
260                     Container controllerDisplayPanel = trafCOD.getDisplayContainer();
261                     if (null != controllerDisplayPanel)
262                     {
263                         JPanel wrapper = new JPanel(new BorderLayout());
264                         wrapper.add(new JScrollPane(controllerDisplayPanel));
265                         TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
266                         tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
267                     }
268                 }
269             }
270             try
271             {
272                 Thread.sleep(300);
273             }
274             catch (InterruptedException e)
275             {
276                 e.printStackTrace();
277             }
278             this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
279         }
280         catch (Exception e)
281         {
282             return e.getMessage();
283         }
284         return null;
285     }
286 
287     /**
288      * Execute one remote control command.
289      * @param data the SIM0MQ encoded command
290      * @param socketMap Map&lt;Long, ZMQ.Socket&gt;; cache of created sockets for returned messages
291      * @return true if another command can be processed after this one; false when no further commands can be processed
292      */
293     @SuppressWarnings("checkstyle:methodlength")
294     private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
295     {
296         boolean result = true;
297         try
298         {
299             Object[] message = Sim0MQMessage.decode(data).createObjectArray();
300             String resultMessage = "OK";
301             Boolean ackNack = null;
302 
303             if (message.length >= 8 && message[5] instanceof String)
304             {
305                 String command = (String) message[5];
306                 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
307 
308                 String[] parts = command.split("\\|");
309                 if (parts.length == 2)
310                 {
311                     // This is a command for the embedded Publisher
312                     ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
313                             new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
314                             socketMap);
315                     if (null == this.publisher)
316                     {
317                         returnWrapper.nack("No simulation loaded; cannot execute command " + command);
318                         System.err.println("No publisher for command " + command);
319                         return true;
320                     }
321                     Object[] payload = Arrays.copyOfRange(message, 8, message.length);
322                     this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
323                     return true;
324                 }
325                 else
326                 {
327                     switch (command)
328                     {
329                         case "NEWSIMULATION":
330                             if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
331                                     && message[10] instanceof Duration && message[11] instanceof Long)
332                             {
333                                 if (null != this.animationPanel)
334                                 {
335                                     for (Container container = this.animationPanel; container != null; container =
336                                             container.getParent())
337                                     {
338                                         if (container instanceof JFrame)
339                                         {
340                                             JFrame jFrame = (JFrame) container;
341                                             jFrame.dispose();
342                                         }
343                                     }
344                                 }
345                                 // System.out.println("xml length = " + ((String) message[8]).length());
346                                 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
347                                         (Long) message[11]);
348                                 ackNack = null == resultMessage;
349                                 if (ackNack)
350                                 {
351                                     resultMessage = "OK";
352                                 }
353                             }
354                             else
355                             {
356                                 resultMessage =
357                                         "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
358                                 ackNack = false;
359                             }
360                             break;
361 
362                         case "DIE":
363                             for (Container container = this.animationPanel; container != null; container =
364                                     container.getParent())
365                             {
366                                 // System.out.println("container is " + container);
367                                 if (container instanceof JFrame)
368                                 {
369                                     JFrame jFrame = (JFrame) container;
370                                     jFrame.dispose();
371                                 }
372                             }
373                             return false;
374 
375                         case "SIMULATEUNTIL":
376                             if (message.length == 9 && message[8] instanceof Time)
377                             {
378                                 System.out.println("Simulating up to " + message[8]);
379                                 if (null == this.network)
380                                 {
381                                     resultMessage = "No network loaded";
382                                     ackNack = false;
383                                     break;
384                                 }
385                                 OtsSimulatorInterface simulator = this.network.getSimulator();
386                                 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
387                                 {
388                                     resultMessage = "Simulation is already at end of simulation time";
389                                     ackNack = false;
390                                     break;
391                                 }
392                                 if (simulator.isStartingOrRunning())
393                                 {
394                                     resultMessage = "Simulator is already running"; // cannot happen for now
395                                     ackNack = false;
396                                     break;
397                                 }
398                                 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
399                                         message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
400                                 returnWrapper.ack(resultMessage);
401                                 simulator.runUpTo((Time) message[8]);
402                                 int count = 0;
403                                 while (this.network.getSimulator().isStartingOrRunning())
404                                 {
405                                     System.out.print(".");
406                                     count++;
407                                     if (count > 1000) // Quit after 1000 attempts of 10 ms; 10 s
408                                     {
409                                         System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
410                                                 + this.network.getSimulator().getSimulatorTime());
411                                         this.network.getSimulator().stop();
412                                         Iterator<SimEventInterface<Duration>> elIt =
413                                                 this.network.getSimulator().getEventList().iterator();
414                                         while (elIt.hasNext())
415                                         {
416                                             System.out.println("EVENTLIST: " + elIt.next());
417                                         }
418                                     }
419                                     try
420                                     {
421                                         Thread.sleep(10); // 10 ms
422                                     }
423                                     catch (InterruptedException e)
424                                     {
425                                         e.printStackTrace();
426                                     }
427                                 }
428                                 // TODO Fix this (it is still needed - 2020-06-16)
429                                 try
430                                 {
431                                     Thread.sleep(100); // EXTRA STOP FOR SYNC REASONS - BUG IN DSOL!
432                                 }
433                                 catch (InterruptedException e)
434                                 {
435                                     e.printStackTrace();
436                                 }
437                                 return true; // ack has been sent when simulation started
438                             }
439                             else
440                             {
441                                 resultMessage = "Bad or missing stop time";
442                                 ackNack = false;
443                             }
444                             break;
445 
446                         default:
447                             IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
448                             if (incomingDataHandler != null)
449                             {
450                                 resultMessage = incomingDataHandler.handleIncomingData(message);
451                                 ackNack = resultMessage == null;
452                             }
453                             else
454                             {
455                                 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
456                                 ackNack = false;
457                             }
458                             break;
459                     }
460                 }
461                 if (resultMessage != null)
462                 {
463                     ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
464                             new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
465                             socketMap);
466                     if (ackNack)
467                     {
468                         returnWrapper.ack(resultMessage);
469                     }
470                     else
471                     {
472                         returnWrapper.nack(resultMessage);
473                     }
474                 }
475             }
476             else
477             {
478                 // We cannot construct a ReturnWrapper because the request has too few fields.
479                 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
480                 System.err.println(HexDumper.hexDumper(data));
481                 return true; // Do we really want to try again?
482             }
483         }
484         catch (Sim0MQException | SerializationException | RemoteException e)
485         {
486             System.err.println("Publisher thread could not decode command");
487             e.printStackTrace();
488         }
489         return result;
490     }
491 
492 }
493 
494 /**
495  * The Model.
496  */
497 class Sim0mqOtsModel extends AbstractOtsModel
498 {
499     /** */
500     private static final long serialVersionUID = 20170419L;
501 
502     /** The network. */
503     private final RoadNetwork network;
504 
505     /** The XML. */
506     private final String xml;
507 
508     /**
509      * @param description the model description
510      * @param network the network
511      * @param xml the XML description of the simulation model
512      */
513     Sim0mqOtsModel(final String description, final RoadNetwork network, final String xml)
514     {
515         super(network.getSimulator(), network.getId(), description);
516         this.network = network;
517         this.xml = xml;
518     }
519 
520     @Override
521     public void constructModel() throws SimRuntimeException
522     {
523         try
524         {
525             new XmlParser(this.network).setStream(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8))).build();
526             // TODO: obtain relevant GTU type from xml
527             ConflictBuilder.buildConflictsParallel(this.network, getSimulator(),
528                     new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)), new LaneCombinationList(),
529                     new LaneCombinationList());
530         }
531         catch (NetworkException | JAXBException | URISyntaxException | XmlParserException | SAXException
532                 | ParserConfigurationException | GtuException | IOException | TrafficControlException exception)
533         {
534             exception.printStackTrace();
535             // Abusing the SimRuntimeException to propagate the message to the main method (the problem could actually be a
536             // parsing problem)
537             throw new SimRuntimeException(exception);
538         }
539     }
540 
541     @Override
542     public Network getNetwork()
543     {
544         return this.network;
545     }
546 
547 }