1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.net.URISyntaxException;
11 import java.nio.charset.StandardCharsets;
12 import java.rmi.RemoteException;
13 import java.util.Arrays;
14 import java.util.HashMap;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import javax.swing.JFrame;
22 import javax.swing.JPanel;
23 import javax.swing.JScrollPane;
24 import javax.xml.bind.JAXBException;
25 import javax.xml.parsers.ParserConfigurationException;
26
27 import org.djunits.value.vdouble.scalar.Duration;
28 import org.djunits.value.vdouble.scalar.Length;
29 import org.djunits.value.vdouble.scalar.Time;
30 import org.djutils.decoderdumper.HexDumper;
31 import org.djutils.immutablecollections.ImmutableMap;
32 import org.djutils.serialization.SerializationException;
33 import org.opentrafficsim.animation.DefaultAnimationFactory;
34 import org.opentrafficsim.animation.gtu.colorer.DefaultSwitchableGtuColorer;
35 import org.opentrafficsim.core.dsol.AbstractOtsModel;
36 import org.opentrafficsim.core.dsol.OtsAnimator;
37 import org.opentrafficsim.core.dsol.OtsSimulatorInterface;
38 import org.opentrafficsim.core.geometry.OtsGeometryException;
39 import org.opentrafficsim.core.gtu.GtuException;
40 import org.opentrafficsim.core.network.Network;
41 import org.opentrafficsim.core.network.NetworkException;
42 import org.opentrafficsim.core.object.NonLocatedObject;
43 import org.opentrafficsim.road.network.RoadNetwork;
44 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
45 import org.opentrafficsim.road.network.factory.xml.parser.XmlParser;
46 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
47 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
48 import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
49 import org.opentrafficsim.sim0mq.publisher.Publisher;
50 import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
51 import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
52 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
53 import org.opentrafficsim.swing.gui.OtsAnimationPanel;
54 import org.opentrafficsim.swing.gui.OtsSimulationApplication;
55 import org.opentrafficsim.swing.gui.OtsSwingApplication;
56 import org.opentrafficsim.swing.script.AbstractSimulationScript;
57 import org.opentrafficsim.trafficcontrol.TrafficControlException;
58 import org.opentrafficsim.trafficcontrol.trafcod.TrafCod;
59 import org.sim0mq.Sim0MQException;
60 import org.sim0mq.message.Sim0MQMessage;
61 import org.xml.sax.SAXException;
62 import org.zeromq.SocketType;
63 import org.zeromq.ZContext;
64 import org.zeromq.ZMQ;
65
66 import nl.tudelft.simulation.dsol.SimRuntimeException;
67 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
68 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
69 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
70 import nl.tudelft.simulation.jstats.streams.StreamInterface;
71
72
73
74
75
76
77
78
79
80
81 public final class Sim0mqPublisher
82 {
83
84 private Publisher publisher = null;
85
86
87 private final ZContext zContext;
88
89
90 private Sim0mqOtsModel model = null;
91
92
93 private RoadNetwork network = null;
94
95
96 private OtsAnimationPanel animationPanel = null;
97
98
99
100
101
102
103
104 public Sim0mqPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
105 {
106 this.zContext = zContext;
107 ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
108 controlSocket.bind("inproc://" + controlInput);
109 ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
110 resultOutputQueue.connect("inproc://" + resultOutput);
111 pollingLoop(controlSocket, resultOutputQueue);
112 }
113
114
115
116
117
118 public Sim0mqPublisher(final int port)
119 {
120 this.zContext = new ZContext(5);
121 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
122 socket.bind("tcp://*:" + port);
123 pollingLoop(socket, socket);
124 }
125
126
127
128
129
130
131
132
133
134
135
136 public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
137 final List<SubscriptionHandler> additionalSubscriptionHandlers,
138 final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
139 {
140 this.zContext = new ZContext(5);
141 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
142 socket.bind("tcp://*:" + port);
143 this.network = preloadedSimulation.getNetwork();
144 this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, null);
145 this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
146 ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
147 ((OtsAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
148 pollingLoop(socket, socket);
149 System.exit(0);
150 }
151
152
153
154
155
156
157
158
159
160 public Sim0mqPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
161 final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
162 {
163 this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
164 }
165
166
167
168
169
170
171 private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
172 {
173 System.out
174 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
175 resultOutputQueue.setHWM(100000);
176 AtomicInteger packetsSent = new AtomicInteger(0);
177 Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
178 ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
179 resultInputQueue.bind("inproc://simulationEvents");
180
181 ZMQ.Poller poller = this.zContext.createPoller(2);
182
183 poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
184 poller.register(controlSocket, ZMQ.Poller.POLLIN);
185 while (!Thread.currentThread().isInterrupted())
186 {
187
188 poller.poll();
189 if (poller.pollin(0))
190 {
191 byte[] data = resultInputQueue.recv();
192
193 byte[] fixedData = data;
194 int number = -1;
195 try
196 {
197
198 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
199 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
200 number = packetsSent.addAndGet(1);
201 fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
202 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
203
204
205 }
206 catch (Sim0MQException | SerializationException e)
207 {
208 e.printStackTrace();
209 }
210 resultOutputQueue.send(fixedData, 0);
211
212 continue;
213 }
214 if (poller.pollin(1))
215 {
216 byte[] data = controlSocket.recv();
217
218 if (!handleCommand(data, socketMap))
219 {
220 break;
221 }
222 }
223 }
224 System.out.println("Exiting publisher polling loop");
225 }
226
227
228
229
230
231
232
233
234
235 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
236 {
237 try
238 {
239 OtsAnimator animator = new OtsAnimator("OTS Animator");
240 this.network = new RoadNetwork("OTS model for Sim0MQPublisher", animator);
241 this.model = new Sim0mqOtsModel("Remotely controlled OTS model", this.network, xml);
242 Map<String, StreamInterface> map = new LinkedHashMap<>();
243 map.put("generation", new MersenneTwister(seed));
244 animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
245 this.publisher = new Publisher(this.network);
246 this.animationPanel = new OtsAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
247 animator, this.model, OtsSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
248 new OtsSimulationApplication<Sim0mqOtsModel>(this.model, this.animationPanel);
249
250 DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGtuColorer());
251 JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
252 frame.setExtendedState(Frame.NORMAL);
253 frame.setSize(new Dimension(1100, 1000));
254 frame.setBounds(0, 25, 1100, 1000);
255 animator.setSpeedFactor(Double.MAX_VALUE, true);
256 animator.setSpeedFactor(1000.0, true);
257
258 ImmutableMap<String, NonLocatedObject> onLocatedObjectMap = this.model.getNetwork().getNonLocatedObjectMap();
259 for (NonLocatedObject ioi : onLocatedObjectMap.values())
260 {
261 if (ioi instanceof TrafCod)
262 {
263 TrafCod trafCOD = (TrafCod) ioi;
264 Container controllerDisplayPanel = trafCOD.getDisplayContainer();
265 if (null != controllerDisplayPanel)
266 {
267 JPanel wrapper = new JPanel(new BorderLayout());
268 wrapper.add(new JScrollPane(controllerDisplayPanel));
269 TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
270 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
271 }
272 }
273 }
274 try
275 {
276 Thread.sleep(300);
277 }
278 catch (InterruptedException e)
279 {
280 e.printStackTrace();
281 }
282 this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
283 }
284 catch (Exception e)
285 {
286 return e.getMessage();
287 }
288 return null;
289 }
290
291
292
293
294
295
296
297 @SuppressWarnings("checkstyle:methodlength")
298 private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
299 {
300 boolean result = true;
301 try
302 {
303 Object[] message = Sim0MQMessage.decode(data).createObjectArray();
304 String resultMessage = "OK";
305 Boolean ackNack = null;
306
307 if (message.length >= 8 && message[5] instanceof String)
308 {
309 String command = (String) message[5];
310 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
311
312 String[] parts = command.split("\\|");
313 if (parts.length == 2)
314 {
315
316 ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
317 new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
318 socketMap);
319 if (null == this.publisher)
320 {
321 returnWrapper.nack("No simulation loaded; cannot execute command " + command);
322 System.err.println("No publisher for command " + command);
323 return true;
324 }
325 Object[] payload = Arrays.copyOfRange(message, 8, message.length);
326 this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
327 return true;
328 }
329 else
330 {
331 switch (command)
332 {
333 case "NEWSIMULATION":
334 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
335 && message[10] instanceof Duration && message[11] instanceof Long)
336 {
337 if (null != this.animationPanel)
338 {
339 for (Container container = this.animationPanel; container != null; container =
340 container.getParent())
341 {
342 if (container instanceof JFrame)
343 {
344 JFrame jFrame = (JFrame) container;
345 jFrame.dispose();
346 }
347 }
348 }
349
350 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
351 (Long) message[11]);
352 ackNack = null == resultMessage;
353 if (ackNack)
354 {
355 resultMessage = "OK";
356 }
357 }
358 else
359 {
360 resultMessage =
361 "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
362 ackNack = false;
363 }
364 break;
365
366 case "DIE":
367 for (Container container = this.animationPanel; container != null; container =
368 container.getParent())
369 {
370
371 if (container instanceof JFrame)
372 {
373 JFrame jFrame = (JFrame) container;
374 jFrame.dispose();
375 }
376 }
377 return false;
378
379 case "SIMULATEUNTIL":
380 if (message.length == 9 && message[8] instanceof Time)
381 {
382 System.out.println("Simulating up to " + message[8]);
383 if (null == this.network)
384 {
385 resultMessage = "No network loaded";
386 ackNack = false;
387 break;
388 }
389 OtsSimulatorInterface simulator = this.network.getSimulator();
390 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
391 {
392 resultMessage = "Simulation is already at end of simulation time";
393 ackNack = false;
394 break;
395 }
396 if (simulator.isStartingOrRunning())
397 {
398 resultMessage = "Simulator is already running";
399 ackNack = false;
400 break;
401 }
402 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
403 message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
404 returnWrapper.ack(resultMessage);
405 simulator.runUpTo((Time) message[8]);
406 int count = 0;
407 while (this.network.getSimulator().isStartingOrRunning())
408 {
409 System.out.print(".");
410 count++;
411 if (count > 1000)
412 {
413 System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
414 + this.network.getSimulator().getSimulatorTime());
415 this.network.getSimulator().stop();
416 Iterator<SimEventInterface<Duration>> elIt =
417 this.network.getSimulator().getEventList().iterator();
418 while (elIt.hasNext())
419 {
420 System.out.println("EVENTLIST: " + elIt.next());
421 }
422 }
423 try
424 {
425 Thread.sleep(10);
426 }
427 catch (InterruptedException e)
428 {
429 e.printStackTrace();
430 }
431 }
432
433 try
434 {
435 Thread.sleep(100);
436 }
437 catch (InterruptedException e)
438 {
439 e.printStackTrace();
440 }
441 return true;
442 }
443 else
444 {
445 resultMessage = "Bad or missing stop time";
446 ackNack = false;
447 }
448 break;
449
450 default:
451 IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
452 if (incomingDataHandler != null)
453 {
454 resultMessage = incomingDataHandler.handleIncomingData(message);
455 ackNack = resultMessage == null;
456 }
457 else
458 {
459 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
460 ackNack = false;
461 }
462 break;
463 }
464 }
465 if (resultMessage != null)
466 {
467 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
468 new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
469 socketMap);
470 if (ackNack)
471 {
472 returnWrapper.ack(resultMessage);
473 }
474 else
475 {
476 returnWrapper.nack(resultMessage);
477 }
478 }
479 }
480 else
481 {
482
483 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
484 System.err.println(HexDumper.hexDumper(data));
485 return true;
486 }
487 }
488 catch (Sim0MQException | SerializationException | RemoteException e)
489 {
490 System.err.println("Publisher thread could not decode command");
491 e.printStackTrace();
492 }
493 return result;
494 }
495
496 }
497
498
499
500
501 class Sim0mqOtsModel extends AbstractOtsModel
502 {
503
504 private static final long serialVersionUID = 20170419L;
505
506
507 private final RoadNetwork network;
508
509
510 private final String xml;
511
512
513
514
515
516
517 Sim0mqOtsModel(final String description, final RoadNetwork network, final String xml)
518 {
519 super(network.getSimulator(), network.getId(), description);
520 this.network = network;
521 this.xml = xml;
522 }
523
524
525 @Override
526 public void constructModel() throws SimRuntimeException
527 {
528 try
529 {
530 new XmlParser(this.network).setStream(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8))).build();
531
532 ConflictBuilder.buildConflictsParallel(this.network, getSimulator(),
533 new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)), new LaneCombinationList(),
534 new LaneCombinationList());
535 }
536 catch (NetworkException | OtsGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
537 | ParserConfigurationException | GtuException | IOException | TrafficControlException exception)
538 {
539 exception.printStackTrace();
540
541
542 throw new SimRuntimeException(exception);
543 }
544 }
545
546
547 @Override
548 public Network getNetwork()
549 {
550 return this.network;
551 }
552
553 }