1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.net.URISyntaxException;
11 import java.nio.charset.StandardCharsets;
12 import java.rmi.RemoteException;
13 import java.util.Arrays;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.Iterator;
17 import java.util.LinkedHashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.atomic.AtomicInteger;
21
22 import javax.swing.JFrame;
23 import javax.swing.JPanel;
24 import javax.swing.JScrollPane;
25 import javax.xml.parsers.ParserConfigurationException;
26
27 import org.djunits.value.vdouble.scalar.Duration;
28 import org.djunits.value.vdouble.scalar.Length;
29 import org.djunits.value.vdouble.scalar.Time;
30 import org.djutils.decoderdumper.HexDumper;
31 import org.djutils.immutablecollections.ImmutableMap;
32 import org.djutils.serialization.SerializationException;
33 import org.opentrafficsim.core.dsol.AbstractOtsModel;
34 import org.opentrafficsim.core.dsol.OtsAnimator;
35 import org.opentrafficsim.core.dsol.OtsSimulatorInterface;
36 import org.opentrafficsim.core.gtu.GtuException;
37 import org.opentrafficsim.core.network.Network;
38 import org.opentrafficsim.core.network.NetworkException;
39 import org.opentrafficsim.core.object.NonLocatedObject;
40 import org.opentrafficsim.core.perception.HistoryManagerDevs;
41 import org.opentrafficsim.road.network.RoadNetwork;
42 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
43 import org.opentrafficsim.road.network.factory.xml.parser.XmlParser;
44 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
45 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
46 import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
47 import org.opentrafficsim.sim0mq.publisher.Publisher;
48 import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
49 import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
50 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
51 import org.opentrafficsim.swing.gui.OtsAnimationPanel;
52 import org.opentrafficsim.swing.gui.OtsSimulationApplication;
53 import org.opentrafficsim.swing.gui.OtsSwingApplication;
54 import org.opentrafficsim.swing.script.AbstractSimulationScript;
55 import org.opentrafficsim.trafficcontrol.TrafficControlException;
56 import org.opentrafficsim.trafficcontrol.trafcod.TrafCod;
57 import org.sim0mq.Sim0MQException;
58 import org.sim0mq.message.Sim0MQMessage;
59 import org.xml.sax.SAXException;
60 import org.zeromq.SocketType;
61 import org.zeromq.ZContext;
62 import org.zeromq.ZMQ;
63
64 import jakarta.xml.bind.JAXBException;
65 import nl.tudelft.simulation.dsol.SimRuntimeException;
66 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
67 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
68 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
69 import nl.tudelft.simulation.jstats.streams.StreamInterface;
70
71
72
73
74
75
76
77
78
79
80 public final class Sim0mqPublisher
81 {
82
83 private Publisher publisher = null;
84
85
86 private final ZContext zContext;
87
88
89 private Sim0mqOtsModel model = null;
90
91
92 private RoadNetwork network = null;
93
94
95 private OtsAnimationPanel animationPanel = null;
96
97
98
99
100
101
102
103 public Sim0mqPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
104 {
105 this.zContext = zContext;
106 ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
107 controlSocket.bind("inproc://" + controlInput);
108 ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
109 resultOutputQueue.connect("inproc://" + resultOutput);
110 pollingLoop(controlSocket, resultOutputQueue);
111 }
112
113
114
115
116
117 public Sim0mqPublisher(final int port)
118 {
119 this.zContext = new ZContext(5);
120 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
121 socket.bind("tcp://*:" + port);
122 pollingLoop(socket, socket);
123 }
124
125
126
127
128
129
130
131
132
133
134 public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
135 final List<SubscriptionHandler> additionalSubscriptionHandlers,
136 final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
137 {
138 this.zContext = new ZContext(5);
139 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
140 socket.bind("tcp://*:" + port);
141 this.network = preloadedSimulation.getNetwork();
142 this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, null);
143 this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
144 ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
145 ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
146 pollingLoop(socket, socket);
147 System.exit(0);
148 }
149
150
151
152
153
154
155
156
157 public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
158 final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
159 {
160 this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
161 }
162
163
164
165
166
167
168 private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
169 {
170 System.out
171 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
172 resultOutputQueue.setHWM(100000);
173 AtomicInteger packetsSent = new AtomicInteger(0);
174 Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
175 ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
176 resultInputQueue.bind("inproc://simulationEvents");
177
178 ZMQ.Poller poller = this.zContext.createPoller(2);
179
180 poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
181 poller.register(controlSocket, ZMQ.Poller.POLLIN);
182 while (!Thread.currentThread().isInterrupted())
183 {
184
185 poller.poll();
186 if (poller.pollin(0))
187 {
188 byte[] data = resultInputQueue.recv();
189
190 byte[] fixedData = data;
191 int number = -1;
192 try
193 {
194
195 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
196 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
197 number = packetsSent.addAndGet(1);
198 fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
199 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
200
201
202 }
203 catch (Sim0MQException | SerializationException e)
204 {
205 e.printStackTrace();
206 }
207 resultOutputQueue.send(fixedData, 0);
208
209 continue;
210 }
211 if (poller.pollin(1))
212 {
213 byte[] data = controlSocket.recv();
214
215 if (!handleCommand(data, socketMap))
216 {
217 break;
218 }
219 }
220 }
221 System.out.println("Exiting publisher polling loop");
222 }
223
224
225
226
227
228
229
230
231
232 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
233 {
234 try
235 {
236 OtsAnimator animator = new OtsAnimator("OTS Animator");
237 this.network = new RoadNetwork("OTS model for Sim0MQPublisher", animator);
238 this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, xml);
239 Map<String, StreamInterface> map = new LinkedHashMap<>();
240 map.put("generation", new MersenneTwister(seed));
241 animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map,
242 HistoryManagerDevs.noHistory(animator));
243 this.publisher = new Publisher(this.network);
244 this.animationPanel = new OtsAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
245 animator, this.model, OtsSwingApplication.DEFAULT_GTU_COLORERS, this.model.getNetwork());
246 new OtsSimulationApplication<Sim0mqOtsModel>(this.model, this.animationPanel, Collections.emptyMap());
247 JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
248 frame.setExtendedState(Frame.NORMAL);
249 frame.setSize(new Dimension(1100, 1000));
250 frame.setBounds(0, 25, 1100, 1000);
251 animator.setSpeedFactor(Double.MAX_VALUE, true);
252 animator.setSpeedFactor(1000.0, true);
253
254 ImmutableMap<String, NonLocatedObject> onLocatedObjectMap = this.model.getNetwork().getNonLocatedObjectMap();
255 for (NonLocatedObject ioi : onLocatedObjectMap.values())
256 {
257 if (ioi instanceof TrafCod)
258 {
259 TrafCod trafCOD = (TrafCod) ioi;
260 Container controllerDisplayPanel = trafCOD.getDisplayContainer();
261 if (null != controllerDisplayPanel)
262 {
263 JPanel wrapper = new JPanel(new BorderLayout());
264 wrapper.add(new JScrollPane(controllerDisplayPanel));
265 TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
266 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
267 }
268 }
269 }
270 try
271 {
272 Thread.sleep(300);
273 }
274 catch (InterruptedException e)
275 {
276 e.printStackTrace();
277 }
278 this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
279 }
280 catch (Exception e)
281 {
282 return e.getMessage();
283 }
284 return null;
285 }
286
287
288
289
290
291
292
293 @SuppressWarnings("checkstyle:methodlength")
294 private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
295 {
296 boolean result = true;
297 try
298 {
299 Object[] message = Sim0MQMessage.decode(data).createObjectArray();
300 String resultMessage = "OK";
301 Boolean ackNack = null;
302
303 if (message.length >= 8 && message[5] instanceof String)
304 {
305 String command = (String) message[5];
306 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
307
308 String[] parts = command.split("\\|");
309 if (parts.length == 2)
310 {
311
312 ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
313 new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
314 socketMap);
315 if (null == this.publisher)
316 {
317 returnWrapper.nack("No simulation loaded; cannot execute command " + command);
318 System.err.println("No publisher for command " + command);
319 return true;
320 }
321 Object[] payload = Arrays.copyOfRange(message, 8, message.length);
322 this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
323 return true;
324 }
325 else
326 {
327 switch (command)
328 {
329 case "NEWSIMULATION":
330 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
331 && message[10] instanceof Duration && message[11] instanceof Long)
332 {
333 if (null != this.animationPanel)
334 {
335 for (Container container = this.animationPanel; container != null; container =
336 container.getParent())
337 {
338 if (container instanceof JFrame)
339 {
340 JFrame jFrame = (JFrame) container;
341 jFrame.dispose();
342 }
343 }
344 }
345
346 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
347 (Long) message[11]);
348 ackNack = null == resultMessage;
349 if (ackNack)
350 {
351 resultMessage = "OK";
352 }
353 }
354 else
355 {
356 resultMessage =
357 "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
358 ackNack = false;
359 }
360 break;
361
362 case "DIE":
363 for (Container container = this.animationPanel; container != null; container =
364 container.getParent())
365 {
366
367 if (container instanceof JFrame)
368 {
369 JFrame jFrame = (JFrame) container;
370 jFrame.dispose();
371 }
372 }
373 return false;
374
375 case "SIMULATEUNTIL":
376 if (message.length == 9 && message[8] instanceof Time)
377 {
378 System.out.println("Simulating up to " + message[8]);
379 if (null == this.network)
380 {
381 resultMessage = "No network loaded";
382 ackNack = false;
383 break;
384 }
385 OtsSimulatorInterface simulator = this.network.getSimulator();
386 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
387 {
388 resultMessage = "Simulation is already at end of simulation time";
389 ackNack = false;
390 break;
391 }
392 if (simulator.isStartingOrRunning())
393 {
394 resultMessage = "Simulator is already running";
395 ackNack = false;
396 break;
397 }
398 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
399 message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
400 returnWrapper.ack(resultMessage);
401 simulator.runUpTo((Time) message[8]);
402 int count = 0;
403 while (this.network.getSimulator().isStartingOrRunning())
404 {
405 System.out.print(".");
406 count++;
407 if (count > 1000)
408 {
409 System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
410 + this.network.getSimulator().getSimulatorTime());
411 this.network.getSimulator().stop();
412 Iterator<SimEventInterface<Duration>> elIt =
413 this.network.getSimulator().getEventList().iterator();
414 while (elIt.hasNext())
415 {
416 System.out.println("EVENTLIST: " + elIt.next());
417 }
418 }
419 try
420 {
421 Thread.sleep(10);
422 }
423 catch (InterruptedException e)
424 {
425 e.printStackTrace();
426 }
427 }
428
429 try
430 {
431 Thread.sleep(100);
432 }
433 catch (InterruptedException e)
434 {
435 e.printStackTrace();
436 }
437 return true;
438 }
439 else
440 {
441 resultMessage = "Bad or missing stop time";
442 ackNack = false;
443 }
444 break;
445
446 default:
447 IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
448 if (incomingDataHandler != null)
449 {
450 resultMessage = incomingDataHandler.handleIncomingData(message);
451 ackNack = resultMessage == null;
452 }
453 else
454 {
455 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
456 ackNack = false;
457 }
458 break;
459 }
460 }
461 if (resultMessage != null)
462 {
463 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
464 new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
465 socketMap);
466 if (ackNack)
467 {
468 returnWrapper.ack(resultMessage);
469 }
470 else
471 {
472 returnWrapper.nack(resultMessage);
473 }
474 }
475 }
476 else
477 {
478
479 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
480 System.err.println(HexDumper.hexDumper(data));
481 return true;
482 }
483 }
484 catch (Sim0MQException | SerializationException | RemoteException e)
485 {
486 System.err.println("Publisher thread could not decode command");
487 e.printStackTrace();
488 }
489 return result;
490 }
491
492 }
493
494
495
496
497 class Sim0mqOtsModel extends AbstractOtsModel
498 {
499
500 private static final long serialVersionUID = 20170419L;
501
502
503 private final RoadNetwork network;
504
505
506 private final String xml;
507
508
509
510
511
512
513 Sim0mqOtsModel(final String description, final RoadNetwork network, final String xml)
514 {
515 super(network.getSimulator(), network.getId(), description);
516 this.network = network;
517 this.xml = xml;
518 }
519
520 @Override
521 public void constructModel() throws SimRuntimeException
522 {
523 try
524 {
525 new XmlParser(this.network).setStream(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8))).build();
526
527 ConflictBuilder.buildConflictsParallel(this.network, getSimulator(),
528 new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)), new LaneCombinationList(),
529 new LaneCombinationList());
530 }
531 catch (NetworkException | JAXBException | URISyntaxException | XmlParserException | SAXException
532 | ParserConfigurationException | GtuException | IOException | TrafficControlException exception)
533 {
534 exception.printStackTrace();
535
536
537 throw new SimRuntimeException(exception);
538 }
539 }
540
541 @Override
542 public Network getNetwork()
543 {
544 return this.network;
545 }
546
547 }