1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.net.URISyntaxException;
11 import java.nio.charset.StandardCharsets;
12 import java.rmi.RemoteException;
13 import java.util.Arrays;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.Iterator;
17 import java.util.LinkedHashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.concurrent.atomic.AtomicInteger;
22
23 import javax.swing.JFrame;
24 import javax.swing.JPanel;
25 import javax.swing.JScrollPane;
26 import javax.xml.parsers.ParserConfigurationException;
27
28 import org.djunits.value.vdouble.scalar.Duration;
29 import org.djunits.value.vdouble.scalar.Length;
30 import org.djunits.value.vdouble.scalar.Time;
31 import org.djutils.decoderdumper.HexDumper;
32 import org.djutils.immutablecollections.ImmutableMap;
33 import org.djutils.serialization.SerializationException;
34 import org.opentrafficsim.base.logger.Logger;
35 import org.opentrafficsim.core.dsol.AbstractOtsModel;
36 import org.opentrafficsim.core.dsol.OtsAnimator;
37 import org.opentrafficsim.core.dsol.OtsSimulatorInterface;
38 import org.opentrafficsim.core.gtu.GtuException;
39 import org.opentrafficsim.core.network.Network;
40 import org.opentrafficsim.core.network.NetworkException;
41 import org.opentrafficsim.core.object.NonLocatedObject;
42 import org.opentrafficsim.core.perception.HistoryManagerDevs;
43 import org.opentrafficsim.road.network.RoadNetwork;
44 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
45 import org.opentrafficsim.road.network.factory.xml.parser.XmlParser;
46 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
47 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
48 import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
49 import org.opentrafficsim.sim0mq.publisher.Publisher;
50 import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
51 import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
52 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
53 import org.opentrafficsim.swing.gui.OtsAnimationPanel;
54 import org.opentrafficsim.swing.gui.OtsSimulationApplication;
55 import org.opentrafficsim.swing.gui.OtsSwingApplication;
56 import org.opentrafficsim.swing.script.AbstractSimulationScript;
57 import org.opentrafficsim.trafficcontrol.TrafficControlException;
58 import org.opentrafficsim.trafficcontrol.trafcod.TrafCod;
59 import org.sim0mq.Sim0MQException;
60 import org.sim0mq.message.Sim0MQMessage;
61 import org.xml.sax.SAXException;
62 import org.zeromq.SocketType;
63 import org.zeromq.ZContext;
64 import org.zeromq.ZMQ;
65
66 import jakarta.xml.bind.JAXBException;
67 import nl.tudelft.simulation.dsol.SimRuntimeException;
68 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
69 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
70 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
71 import nl.tudelft.simulation.jstats.streams.StreamInterface;
72
73
74
75
76
77
78
79
80
81
82 public final class Sim0mqPublisher
83 {
84
85 private Publisher publisher = null;
86
87
88 private final ZContext zContext;
89
90
91 private Sim0mqOtsModel model = null;
92
93
94 private RoadNetwork network = null;
95
96
97 private OtsAnimationPanel animationPanel = null;
98
99
100
101
102
103
104
105 public Sim0mqPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
106 {
107 this.zContext = zContext;
108 ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
109 controlSocket.bind("inproc://" + controlInput);
110 ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
111 resultOutputQueue.connect("inproc://" + resultOutput);
112 pollingLoop(controlSocket, resultOutputQueue);
113 }
114
115
116
117
118
119 public Sim0mqPublisher(final int port)
120 {
121 this.zContext = new ZContext(5);
122 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
123 socket.bind("tcp://*:" + port);
124 pollingLoop(socket, socket);
125 }
126
127
128
129
130
131
132
133
134
135
136 public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
137 final List<SubscriptionHandler> additionalSubscriptionHandlers,
138 final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
139 {
140 this.zContext = new ZContext(5);
141 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
142 socket.bind("tcp://*:" + port);
143 this.network = preloadedSimulation.getNetwork();
144 this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, null);
145 this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
146 ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
147 ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
148 pollingLoop(socket, socket);
149 System.exit(0);
150 }
151
152
153
154
155
156
157
158
159 public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
160 final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
161 {
162 this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
163 }
164
165
166
167
168
169
170 private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
171 {
172 System.out
173 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
174 resultOutputQueue.setHWM(100000);
175 AtomicInteger packetsSent = new AtomicInteger(0);
176 Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
177 ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
178 resultInputQueue.bind("inproc://simulationEvents");
179
180 ZMQ.Poller poller = this.zContext.createPoller(2);
181
182 poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
183 poller.register(controlSocket, ZMQ.Poller.POLLIN);
184 while (!Thread.currentThread().isInterrupted())
185 {
186 Logger.ots().trace("Publisher calls Poller.poll()");
187 poller.poll();
188 if (poller.pollin(0))
189 {
190 byte[] data = resultInputQueue.recv();
191 Logger.ots().trace("Publisher got outgoing result of " + data.length + " bytes");
192 byte[] fixedData = data;
193 int number = -1;
194 try
195 {
196
197 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
198 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
199 number = packetsSent.addAndGet(1);
200 fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
201 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
202
203
204 }
205 catch (Sim0MQException | SerializationException e)
206 {
207 e.printStackTrace();
208 }
209 resultOutputQueue.send(fixedData, 0);
210 Logger.ots().trace("Outgoing result handed over to controlSocket");
211 continue;
212 }
213 if (poller.pollin(1))
214 {
215 byte[] data = controlSocket.recv();
216 Logger.ots().trace("Publisher received a command of " + data.length + " bytes");
217 if (!handleCommand(data, socketMap))
218 {
219 break;
220 }
221 }
222 }
223 Logger.ots().info("Exiting publisher polling loop");
224 }
225
226
227
228
229
230
231
232
233
234 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
235 {
236 try
237 {
238 OtsAnimator animator = new OtsAnimator("OTS Animator");
239 this.network = new RoadNetwork("OTS model for Sim0MQPublisher", animator);
240 this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, xml);
241 Map<String, StreamInterface> map = new LinkedHashMap<>();
242 map.put("generation", new MersenneTwister(seed));
243 this.model.getStreams().putAll(map);
244 animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, HistoryManagerDevs.noHistory(animator));
245 this.publisher = new Publisher(this.network);
246 this.animationPanel = new OtsAnimationPanel(this.model.getNetwork().getExtent(), animator, this.model,
247 OtsSwingApplication.DEFAULT_GTU_COLORERS, this.model.getNetwork());
248 new OtsSimulationApplication<Sim0mqOtsModel>(this.model, this.animationPanel, Collections.emptyMap());
249 JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
250 frame.setExtendedState(Frame.NORMAL);
251 frame.setSize(new Dimension(1100, 1000));
252 frame.setBounds(0, 25, 1100, 1000);
253 animator.setSpeedFactor(Double.MAX_VALUE, true);
254 animator.setSpeedFactor(1000.0, true);
255
256 ImmutableMap<String, NonLocatedObject> onLocatedObjectMap = this.model.getNetwork().getNonLocatedObjectMap();
257 for (NonLocatedObject ioi : onLocatedObjectMap.values())
258 {
259 if (ioi instanceof TrafCod)
260 {
261 TrafCod trafCOD = (TrafCod) ioi;
262 Optional<Container> controllerDisplayPanel = trafCOD.getDisplayContainer();
263 if (controllerDisplayPanel.isPresent())
264 {
265 JPanel wrapper = new JPanel(new BorderLayout());
266 wrapper.add(new JScrollPane(controllerDisplayPanel.get()));
267 TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
268 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
269 }
270 }
271 }
272 try
273 {
274 Thread.sleep(300);
275 }
276 catch (InterruptedException e)
277 {
278 e.printStackTrace();
279 }
280 this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
281 }
282 catch (Exception e)
283 {
284 return e.getMessage();
285 }
286 return null;
287 }
288
289
290
291
292
293
294
295 @SuppressWarnings("checkstyle:methodlength")
296 private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
297 {
298 boolean result = true;
299 try
300 {
301 Object[] message = Sim0MQMessage.decode(data).createObjectArray();
302 String resultMessage = "OK";
303 Boolean ackNack = null;
304
305 if (message.length >= 8 && message[5] instanceof String)
306 {
307 String command = (String) message[5];
308 Logger.ots().info("Publisher thread decoded Sim0MQ command: " + command);
309
310 String[] parts = command.split("\\|");
311 if (parts.length == 2)
312 {
313
314 ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
315 new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
316 socketMap);
317 if (null == this.publisher)
318 {
319 returnWrapper.nack("No simulation loaded; cannot execute command " + command);
320 Logger.ots().error("No publisher for command " + command);
321 return true;
322 }
323 Object[] payload = Arrays.copyOfRange(message, 8, message.length);
324 this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
325 return true;
326 }
327 else
328 {
329 switch (command)
330 {
331 case "NEWSIMULATION":
332 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
333 && message[10] instanceof Duration && message[11] instanceof Long)
334 {
335 if (null != this.animationPanel)
336 {
337 for (Container container = this.animationPanel; container != null; container =
338 container.getParent())
339 {
340 if (container instanceof JFrame)
341 {
342 JFrame jFrame = (JFrame) container;
343 jFrame.dispose();
344 }
345 }
346 }
347 Logger.ots().trace("xml length = " + ((String) message[8]).length());
348 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
349 (Long) message[11]);
350 ackNack = null == resultMessage;
351 if (ackNack)
352 {
353 resultMessage = "OK";
354 }
355 }
356 else
357 {
358 resultMessage =
359 "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
360 ackNack = false;
361 }
362 break;
363
364 case "DIE":
365 for (Container container = this.animationPanel; container != null; container =
366 container.getParent())
367 {
368 Logger.ots().trace("container is " + container);
369 if (container instanceof JFrame)
370 {
371 JFrame jFrame = (JFrame) container;
372 jFrame.dispose();
373 }
374 }
375 return false;
376
377 case "SIMULATEUNTIL":
378 if (message.length == 9 && message[8] instanceof Duration)
379 {
380 Logger.ots().info("Simulating up to " + message[8]);
381 if (null == this.network)
382 {
383 resultMessage = "No network loaded";
384 ackNack = false;
385 break;
386 }
387 OtsSimulatorInterface simulator = this.network.getSimulator();
388 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
389 {
390 resultMessage = "Simulation is already at end of simulation time";
391 ackNack = false;
392 break;
393 }
394 if (simulator.isStartingOrRunning())
395 {
396 resultMessage = "Simulator is already running";
397 ackNack = false;
398 break;
399 }
400 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
401 message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
402 returnWrapper.ack(resultMessage);
403 simulator.runUpTo((Duration) message[8]);
404 int count = 0;
405 while (this.network.getSimulator().isStartingOrRunning())
406 {
407 Logger.ots().info(".");
408 count++;
409 if (count > 1000)
410 {
411 Logger.ots().info("TIMEOUT - STOPPING SIMULATOR. TIME = "
412 + this.network.getSimulator().getSimulatorTime());
413 this.network.getSimulator().stop();
414 Iterator<SimEventInterface<Duration>> elIt =
415 this.network.getSimulator().getEventList().iterator();
416 while (elIt.hasNext())
417 {
418 Logger.ots().info("EVENTLIST: " + elIt.next());
419 }
420 }
421 try
422 {
423 Thread.sleep(10);
424 }
425 catch (InterruptedException e)
426 {
427 e.printStackTrace();
428 }
429 }
430
431 try
432 {
433 Thread.sleep(100);
434 }
435 catch (InterruptedException e)
436 {
437 e.printStackTrace();
438 }
439 return true;
440 }
441 else
442 {
443 resultMessage = "Bad or missing stop time";
444 ackNack = false;
445 }
446 break;
447
448 default:
449 IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
450 if (incomingDataHandler != null)
451 {
452 resultMessage = incomingDataHandler.handleIncomingData(message);
453 ackNack = resultMessage == null;
454 }
455 else
456 {
457 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
458 ackNack = false;
459 }
460 break;
461 }
462 }
463 if (resultMessage != null)
464 {
465 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
466 new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
467 socketMap);
468 if (ackNack)
469 {
470 returnWrapper.ack(resultMessage);
471 }
472 else
473 {
474 returnWrapper.nack(resultMessage);
475 }
476 }
477 }
478 else
479 {
480
481 Logger.ots().error("Publisher expected Sim0MQ command but is has too few fields:");
482 Logger.ots().error(HexDumper.hexDumper(data));
483 return true;
484 }
485 }
486 catch (Sim0MQException | SerializationException | RemoteException e)
487 {
488 Logger.ots().error("Publisher thread could not decode command");
489 e.printStackTrace();
490 }
491 return result;
492 }
493
494 }
495
496
497
498
499 class Sim0mqOtsModel extends AbstractOtsModel
500 {
501
502 private final RoadNetwork network;
503
504
505 private final String xml;
506
507
508
509
510
511
512
513 Sim0mqOtsModel(final String description, final RoadNetwork network, final String xml)
514 {
515 super(network.getSimulator(), network.getId(), description, AbstractOtsModel.defaultInitialStreams());
516 this.network = network;
517 this.xml = xml;
518 }
519
520 @Override
521 public void constructModel() throws SimRuntimeException
522 {
523 try
524 {
525 new XmlParser(this.network).setStream(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8))).build();
526
527 ConflictBuilder.buildConflictsParallel(this.network, getSimulator(),
528 new ConflictBuilder.FixedWidthGenerator(Length.ofSI(2.0)), new LaneCombinationList(),
529 new LaneCombinationList());
530 }
531 catch (NetworkException | JAXBException | URISyntaxException | XmlParserException | SAXException
532 | ParserConfigurationException | GtuException | IOException | TrafficControlException exception)
533 {
534 exception.printStackTrace();
535
536
537 throw new SimRuntimeException(exception);
538 }
539 }
540
541 @Override
542 public Network getNetwork()
543 {
544 return this.network;
545 }
546
547 }