View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.awt.BorderLayout;
4   import java.awt.Container;
5   import java.awt.Dimension;
6   import java.awt.Frame;
7   import java.awt.event.ActionEvent;
8   import java.io.ByteArrayInputStream;
9   import java.io.IOException;
10  import java.net.URISyntaxException;
11  import java.nio.charset.StandardCharsets;
12  import java.rmi.RemoteException;
13  import java.util.Arrays;
14  import java.util.Collections;
15  import java.util.HashMap;
16  import java.util.Iterator;
17  import java.util.LinkedHashMap;
18  import java.util.List;
19  import java.util.Map;
20  import java.util.Optional;
21  import java.util.concurrent.atomic.AtomicInteger;
22  
23  import javax.swing.JFrame;
24  import javax.swing.JPanel;
25  import javax.swing.JScrollPane;
26  import javax.xml.parsers.ParserConfigurationException;
27  
28  import org.djunits.value.vdouble.scalar.Duration;
29  import org.djunits.value.vdouble.scalar.Length;
30  import org.djunits.value.vdouble.scalar.Time;
31  import org.djutils.decoderdumper.HexDumper;
32  import org.djutils.immutablecollections.ImmutableMap;
33  import org.djutils.serialization.SerializationException;
34  import org.opentrafficsim.base.logger.Logger;
35  import org.opentrafficsim.core.dsol.AbstractOtsModel;
36  import org.opentrafficsim.core.dsol.OtsAnimator;
37  import org.opentrafficsim.core.dsol.OtsSimulatorInterface;
38  import org.opentrafficsim.core.gtu.GtuException;
39  import org.opentrafficsim.core.network.Network;
40  import org.opentrafficsim.core.network.NetworkException;
41  import org.opentrafficsim.core.object.NonLocatedObject;
42  import org.opentrafficsim.core.perception.HistoryManagerDevs;
43  import org.opentrafficsim.road.network.RoadNetwork;
44  import org.opentrafficsim.road.network.factory.xml.XmlParserException;
45  import org.opentrafficsim.road.network.factory.xml.parser.XmlParser;
46  import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
47  import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
48  import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
49  import org.opentrafficsim.sim0mq.publisher.Publisher;
50  import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
51  import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
52  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
53  import org.opentrafficsim.swing.gui.OtsAnimationPanel;
54  import org.opentrafficsim.swing.gui.OtsSimulationApplication;
55  import org.opentrafficsim.swing.gui.OtsSwingApplication;
56  import org.opentrafficsim.swing.script.AbstractSimulationScript;
57  import org.opentrafficsim.trafficcontrol.TrafficControlException;
58  import org.opentrafficsim.trafficcontrol.trafcod.TrafCod;
59  import org.sim0mq.Sim0MQException;
60  import org.sim0mq.message.Sim0MQMessage;
61  import org.xml.sax.SAXException;
62  import org.zeromq.SocketType;
63  import org.zeromq.ZContext;
64  import org.zeromq.ZMQ;
65  
66  import jakarta.xml.bind.JAXBException;
67  import nl.tudelft.simulation.dsol.SimRuntimeException;
68  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
69  import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
70  import nl.tudelft.simulation.jstats.streams.MersenneTwister;
71  import nl.tudelft.simulation.jstats.streams.StreamInterface;
72  
73  /**
74   * Sim0MQPublisher - make many OTS simulation controls and observations available over Sim0MQ.
75   * <p>
76   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
77   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
78   * </p>
79   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
80   * @author <a href="https://github.com/peter-knoppers">Peter Knoppers</a>
81   */
82  public final class Sim0mqPublisher
83  {
84      /** The publisher. */
85      private Publisher publisher = null;
86  
87      /** The ZContect. */
88      private final ZContext zContext;
89  
90      /** The simulation model. */
91      private Sim0mqOtsModel model = null;
92  
93      /** The OTS road network. */
94      private RoadNetwork network = null;
95  
96      /** The OTS animation panel. */
97      private OtsAnimationPanel animationPanel = null;
98  
99      /**
100      * Create a new Sim0MQPublisher that is operated through //inproc sockets.
101      * @param zContext needed to create the sockets
102      * @param controlInput PULL socket for control input
103      * @param resultOutput PUSH socket to output results
104      */
105     public Sim0mqPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
106     {
107         this.zContext = zContext;
108         ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
109         controlSocket.bind("inproc://" + controlInput);
110         ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
111         resultOutputQueue.connect("inproc://" + resultOutput);
112         pollingLoop(controlSocket, resultOutputQueue);
113     }
114 
115     /**
116      * Create a new Sim0MQPublisher that uses TCP transport.
117      * @param port port number to bind to
118      */
119     public Sim0mqPublisher(final int port)
120     {
121         this.zContext = new ZContext(5);
122         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
123         socket.bind("tcp://*:" + port);
124         pollingLoop(socket, socket);
125     }
126 
127     /**
128      * Create a new Sim0MQPublisher that uses TCP transport.
129      * @param port port number to bind to
130      * @param preloadedSimulation a fully loaded (but not started) simulation
131      * @param additionalSubscriptionHandlers list of additional subscription handlers (may be null)
132      * @param incomingDataHandlers list of additional handlers for incoming data that is not handled by the standard
133      *            Sim0MQPublisher (may be null)
134      * @throws RemoteException when construction of the Publisher failed
135      */
136     public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
137             final List<SubscriptionHandler> additionalSubscriptionHandlers,
138             final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
139     {
140         this.zContext = new ZContext(5);
141         ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
142         socket.bind("tcp://*:" + port);
143         this.network = preloadedSimulation.getNetwork();
144         this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, null);
145         this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
146         ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
147         ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
148         pollingLoop(socket, socket);
149         System.exit(0);
150     }
151 
152     /**
153      * Create a new Sim0MQPublisher that uses TCP transport.
154      * @param port port number to bind to
155      * @param preloadedSimulation a fully loaded (but not started) simulation
156      * @param additionalSubscriptionHandlers list of additional subscription handlers (may be null)
157      * @throws RemoteException when construction of the Publisher failed
158      */
159     public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
160             final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
161     {
162         this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
163     }
164 
165     /**
166      * Poller that receives the commands and ensures that various output sources can talk to the master.
167      * @param controlSocket ZMQ.Socket; PULL socket for commands from the master
168      * @param resultOutputQueue ZMQ.Socket; PULL socket for output that must be relayed to the master
169      */
170     private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
171     {
172         System.out
173                 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
174         resultOutputQueue.setHWM(100000);
175         AtomicInteger packetsSent = new AtomicInteger(0);
176         Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
177         ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
178         resultInputQueue.bind("inproc://simulationEvents");
179         // Poll the two input sockets using ZMQ poller
180         ZMQ.Poller poller = this.zContext.createPoller(2);
181         // TODO ensure that this also handles a closed control socket gracefully
182         poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
183         poller.register(controlSocket, ZMQ.Poller.POLLIN);
184         while (!Thread.currentThread().isInterrupted())
185         {
186             Logger.ots().trace("Publisher calls Poller.poll()");
187             poller.poll();
188             if (poller.pollin(0))
189             {
190                 byte[] data = resultInputQueue.recv();
191                 Logger.ots().trace("Publisher got outgoing result of " + data.length + " bytes");
192                 byte[] fixedData = data;
193                 int number = -1;
194                 try
195                 {
196                     // Patch the sender field to include the packet counter value - this is bloody expensive...
197                     Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
198                     Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
199                     number = packetsSent.addAndGet(1);
200                     fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
201                             messageFields[4], messageFields[5], messageFields[6], newMessageFields);
202                     // System.out
203                     // .println("Prepared message " + number + ", type is \"" + messageFields[5] + "\", " + messageFields[6]);
204                 }
205                 catch (Sim0MQException | SerializationException e)
206                 {
207                     e.printStackTrace();
208                 }
209                 resultOutputQueue.send(fixedData, 0);
210                 Logger.ots().trace("Outgoing result handed over to controlSocket");
211                 continue; // Check for more results before checking the control input
212             }
213             if (poller.pollin(1))
214             {
215                 byte[] data = controlSocket.recv();
216                 Logger.ots().trace("Publisher received a command of " + data.length + " bytes");
217                 if (!handleCommand(data, socketMap))
218                 {
219                     break;
220                 }
221             }
222         }
223         Logger.ots().info("Exiting publisher polling loop");
224     }
225 
226     /**
227      * Construct an OTS simulation experiment from an XML description.
228      * @param xml the XML encoded network
229      * @param simulationDuration total duration of the simulation
230      * @param warmupTime warm up time of the simulation
231      * @param seed seed for the experiment
232      * @return null on success, description of the problem on error
233      */
234     private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
235     {
236         try
237         {
238             OtsAnimator animator = new OtsAnimator("OTS Animator");
239             this.network = new RoadNetwork("OTS model for Sim0MQPublisher", animator);
240             this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, xml);
241             Map<String, StreamInterface> map = new LinkedHashMap<>();
242             map.put("generation", new MersenneTwister(seed));
243             this.model.getStreams().putAll(map);
244             animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, HistoryManagerDevs.noHistory(animator));
245             this.publisher = new Publisher(this.network);
246             this.animationPanel = new OtsAnimationPanel(this.model.getNetwork().getExtent(), animator, this.model,
247                     OtsSwingApplication.DEFAULT_GTU_COLORERS, this.model.getNetwork());
248             new OtsSimulationApplication<Sim0mqOtsModel>(this.model, this.animationPanel, Collections.emptyMap());
249             JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
250             frame.setExtendedState(Frame.NORMAL);
251             frame.setSize(new Dimension(1100, 1000));
252             frame.setBounds(0, 25, 1100, 1000);
253             animator.setSpeedFactor(Double.MAX_VALUE, true);
254             animator.setSpeedFactor(1000.0, true);
255 
256             ImmutableMap<String, NonLocatedObject> onLocatedObjectMap = this.model.getNetwork().getNonLocatedObjectMap();
257             for (NonLocatedObject ioi : onLocatedObjectMap.values())
258             {
259                 if (ioi instanceof TrafCod)
260                 {
261                     TrafCod trafCOD = (TrafCod) ioi;
262                     Optional<Container> controllerDisplayPanel = trafCOD.getDisplayContainer();
263                     if (controllerDisplayPanel.isPresent())
264                     {
265                         JPanel wrapper = new JPanel(new BorderLayout());
266                         wrapper.add(new JScrollPane(controllerDisplayPanel.get()));
267                         TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
268                         tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
269                     }
270                 }
271             }
272             try
273             {
274                 Thread.sleep(300);
275             }
276             catch (InterruptedException e)
277             {
278                 e.printStackTrace();
279             }
280             this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
281         }
282         catch (Exception e)
283         {
284             return e.getMessage();
285         }
286         return null;
287     }
288 
289     /**
290      * Execute one remote control command.
291      * @param data the SIM0MQ encoded command
292      * @param socketMap Map&lt;Long, ZMQ.Socket&gt;; cache of created sockets for returned messages
293      * @return true if another command can be processed after this one; false when no further commands can be processed
294      */
295     @SuppressWarnings("checkstyle:methodlength")
296     private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
297     {
298         boolean result = true;
299         try
300         {
301             Object[] message = Sim0MQMessage.decode(data).createObjectArray();
302             String resultMessage = "OK";
303             Boolean ackNack = null;
304 
305             if (message.length >= 8 && message[5] instanceof String)
306             {
307                 String command = (String) message[5];
308                 Logger.ots().info("Publisher thread decoded Sim0MQ command: " + command);
309 
310                 String[] parts = command.split("\\|");
311                 if (parts.length == 2)
312                 {
313                     // This is a command for the embedded Publisher
314                     ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
315                             new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
316                             socketMap);
317                     if (null == this.publisher)
318                     {
319                         returnWrapper.nack("No simulation loaded; cannot execute command " + command);
320                         Logger.ots().error("No publisher for command " + command);
321                         return true;
322                     }
323                     Object[] payload = Arrays.copyOfRange(message, 8, message.length);
324                     this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
325                     return true;
326                 }
327                 else
328                 {
329                     switch (command)
330                     {
331                         case "NEWSIMULATION":
332                             if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
333                                     && message[10] instanceof Duration && message[11] instanceof Long)
334                             {
335                                 if (null != this.animationPanel)
336                                 {
337                                     for (Container container = this.animationPanel; container != null; container =
338                                             container.getParent())
339                                     {
340                                         if (container instanceof JFrame)
341                                         {
342                                             JFrame jFrame = (JFrame) container;
343                                             jFrame.dispose();
344                                         }
345                                     }
346                                 }
347                                 Logger.ots().trace("xml length = " + ((String) message[8]).length());
348                                 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
349                                         (Long) message[11]);
350                                 ackNack = null == resultMessage;
351                                 if (ackNack)
352                                 {
353                                     resultMessage = "OK";
354                                 }
355                             }
356                             else
357                             {
358                                 resultMessage =
359                                         "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
360                                 ackNack = false;
361                             }
362                             break;
363 
364                         case "DIE":
365                             for (Container container = this.animationPanel; container != null; container =
366                                     container.getParent())
367                             {
368                                 Logger.ots().trace("container is " + container);
369                                 if (container instanceof JFrame)
370                                 {
371                                     JFrame jFrame = (JFrame) container;
372                                     jFrame.dispose();
373                                 }
374                             }
375                             return false;
376 
377                         case "SIMULATEUNTIL":
378                             if (message.length == 9 && message[8] instanceof Duration)
379                             {
380                                 Logger.ots().info("Simulating up to " + message[8]);
381                                 if (null == this.network)
382                                 {
383                                     resultMessage = "No network loaded";
384                                     ackNack = false;
385                                     break;
386                                 }
387                                 OtsSimulatorInterface simulator = this.network.getSimulator();
388                                 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
389                                 {
390                                     resultMessage = "Simulation is already at end of simulation time";
391                                     ackNack = false;
392                                     break;
393                                 }
394                                 if (simulator.isStartingOrRunning())
395                                 {
396                                     resultMessage = "Simulator is already running"; // cannot happen for now
397                                     ackNack = false;
398                                     break;
399                                 }
400                                 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
401                                         message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
402                                 returnWrapper.ack(resultMessage);
403                                 simulator.runUpTo((Duration) message[8]);
404                                 int count = 0;
405                                 while (this.network.getSimulator().isStartingOrRunning())
406                                 {
407                                     Logger.ots().info(".");
408                                     count++;
409                                     if (count > 1000) // Quit after 1000 attempts of 10 ms; 10 s
410                                     {
411                                         Logger.ots().info("TIMEOUT - STOPPING SIMULATOR. TIME = "
412                                                 + this.network.getSimulator().getSimulatorTime());
413                                         this.network.getSimulator().stop();
414                                         Iterator<SimEventInterface<Duration>> elIt =
415                                                 this.network.getSimulator().getEventList().iterator();
416                                         while (elIt.hasNext())
417                                         {
418                                             Logger.ots().info("EVENTLIST: " + elIt.next());
419                                         }
420                                     }
421                                     try
422                                     {
423                                         Thread.sleep(10); // 10 ms
424                                     }
425                                     catch (InterruptedException e)
426                                     {
427                                         e.printStackTrace();
428                                     }
429                                 }
430                                 // TODO Fix this (it is still needed - 2020-06-16)
431                                 try
432                                 {
433                                     Thread.sleep(100); // EXTRA STOP FOR SYNC REASONS - BUG IN DSOL!
434                                 }
435                                 catch (InterruptedException e)
436                                 {
437                                     e.printStackTrace();
438                                 }
439                                 return true; // ack has been sent when simulation started
440                             }
441                             else
442                             {
443                                 resultMessage = "Bad or missing stop time";
444                                 ackNack = false;
445                             }
446                             break;
447 
448                         default:
449                             IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
450                             if (incomingDataHandler != null)
451                             {
452                                 resultMessage = incomingDataHandler.handleIncomingData(message);
453                                 ackNack = resultMessage == null;
454                             }
455                             else
456                             {
457                                 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
458                                 ackNack = false;
459                             }
460                             break;
461                     }
462                 }
463                 if (resultMessage != null)
464                 {
465                     ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
466                             new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
467                             socketMap);
468                     if (ackNack)
469                     {
470                         returnWrapper.ack(resultMessage);
471                     }
472                     else
473                     {
474                         returnWrapper.nack(resultMessage);
475                     }
476                 }
477             }
478             else
479             {
480                 // We cannot construct a ReturnWrapper because the request has too few fields.
481                 Logger.ots().error("Publisher expected Sim0MQ command but is has too few fields:");
482                 Logger.ots().error(HexDumper.hexDumper(data));
483                 return true; // Do we really want to try again?
484             }
485         }
486         catch (Sim0MQException | SerializationException | RemoteException e)
487         {
488             Logger.ots().error("Publisher thread could not decode command");
489             e.printStackTrace();
490         }
491         return result;
492     }
493 
494 }
495 
496 /**
497  * The Model.
498  */
499 class Sim0mqOtsModel extends AbstractOtsModel
500 {
501     /** The network. */
502     private final RoadNetwork network;
503 
504     /** The XML. */
505     private final String xml;
506 
507     /**
508      * Constructor.
509      * @param description the model description
510      * @param network the network
511      * @param xml the XML description of the simulation model
512      */
513     Sim0mqOtsModel(final String description, final RoadNetwork network, final String xml)
514     {
515         super(network.getSimulator(), network.getId(), description, AbstractOtsModel.defaultInitialStreams());
516         this.network = network;
517         this.xml = xml;
518     }
519 
520     @Override
521     public void constructModel() throws SimRuntimeException
522     {
523         try
524         {
525             new XmlParser(this.network).setStream(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8))).build();
526             // TODO: obtain relevant GTU type from xml
527             ConflictBuilder.buildConflictsParallel(this.network, getSimulator(),
528                     new ConflictBuilder.FixedWidthGenerator(Length.ofSI(2.0)), new LaneCombinationList(),
529                     new LaneCombinationList());
530         }
531         catch (NetworkException | JAXBException | URISyntaxException | XmlParserException | SAXException
532                 | ParserConfigurationException | GtuException | IOException | TrafficControlException exception)
533         {
534             exception.printStackTrace();
535             // Abusing the SimRuntimeException to propagate the message to the main method (the problem could actually be a
536             // parsing problem)
537             throw new SimRuntimeException(exception);
538         }
539     }
540 
541     @Override
542     public Network getNetwork()
543     {
544         return this.network;
545     }
546 
547 }