1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.io.Serializable;
11 import java.net.URISyntaxException;
12 import java.nio.charset.StandardCharsets;
13 import java.rmi.RemoteException;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.Iterator;
17 import java.util.LinkedHashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.atomic.AtomicInteger;
21
22 import javax.swing.JFrame;
23 import javax.swing.JPanel;
24 import javax.swing.JScrollPane;
25 import javax.xml.bind.JAXBException;
26 import javax.xml.parsers.ParserConfigurationException;
27
28 import org.djunits.value.vdouble.scalar.Duration;
29 import org.djunits.value.vdouble.scalar.Length;
30 import org.djunits.value.vdouble.scalar.Time;
31 import org.djutils.decoderdumper.HexDumper;
32 import org.djutils.immutablecollections.ImmutableMap;
33 import org.djutils.serialization.SerializationException;
34 import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
35 import org.opentrafficsim.core.dsol.AbstractOTSModel;
36 import org.opentrafficsim.core.dsol.OTSAnimator;
37 import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
38 import org.opentrafficsim.core.geometry.OTSGeometryException;
39 import org.opentrafficsim.core.gtu.GTUException;
40 import org.opentrafficsim.core.gtu.GTUType;
41 import org.opentrafficsim.core.network.NetworkException;
42 import org.opentrafficsim.core.network.OTSNetwork;
43 import org.opentrafficsim.core.object.InvisibleObjectInterface;
44 import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
45 import org.opentrafficsim.road.network.OTSRoadNetwork;
46 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
47 import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
48 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
49 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
50 import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
51 import org.opentrafficsim.sim0mq.publisher.Publisher;
52 import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
53 import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
54 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
55 import org.opentrafficsim.swing.gui.OTSAnimationPanel;
56 import org.opentrafficsim.swing.gui.OTSSimulationApplication;
57 import org.opentrafficsim.swing.gui.OTSSwingApplication;
58 import org.opentrafficsim.swing.script.AbstractSimulationScript;
59 import org.opentrafficsim.trafficcontrol.TrafficControlException;
60 import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
61 import org.sim0mq.Sim0MQException;
62 import org.sim0mq.message.Sim0MQMessage;
63 import org.xml.sax.SAXException;
64 import org.zeromq.SocketType;
65 import org.zeromq.ZContext;
66 import org.zeromq.ZMQ;
67
68 import nl.tudelft.simulation.dsol.SimRuntimeException;
69 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
70 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
71 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
72 import nl.tudelft.simulation.jstats.streams.StreamInterface;
73
74
75
76
77
78
79
80
81
82
83 public final class Sim0MQPublisher
84 {
85
86 private Publisher publisher = null;
87
88
89 private final ZContext zContext;
90
91
92 private Sim0MQOTSModel model = null;
93
94
95 private OTSRoadNetwork network = null;
96
97
98 private OTSAnimationPanel animationPanel = null;
99
100
101
102
103
104
105
106 public Sim0MQPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
107 {
108 this.zContext = zContext;
109 ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
110 controlSocket.bind("inproc://" + controlInput);
111 ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
112 resultOutputQueue.connect("inproc://" + resultOutput);
113 pollingLoop(controlSocket, resultOutputQueue);
114 }
115
116
117
118
119
120 public Sim0MQPublisher(final int port)
121 {
122 this.zContext = new ZContext(5);
123 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
124 socket.bind("tcp://*:" + port);
125 pollingLoop(socket, socket);
126 }
127
128
129
130
131
132
133
134
135
136
137
138 public Sim0MQPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
139 final List<SubscriptionHandler> additionalSubscriptionHandlers,
140 final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
141 {
142 this.zContext = new ZContext(5);
143 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
144 socket.bind("tcp://*:" + port);
145 this.network = preloadedSimulation.getNetwork();
146 this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, null);
147 this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
148 ((OTSAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
149 ((OTSAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
150 pollingLoop(socket, socket);
151 System.exit(0);
152 }
153
154
155
156
157
158
159
160
161
162 public Sim0MQPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
163 final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
164 {
165 this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
166 }
167
168
169
170
171
172
173 private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
174 {
175 System.out
176 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
177 resultOutputQueue.setHWM(100000);
178 AtomicInteger packetsSent = new AtomicInteger(0);
179 Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
180 ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
181 resultInputQueue.bind("inproc://simulationEvents");
182
183 ZMQ.Poller poller = this.zContext.createPoller(2);
184
185 poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
186 poller.register(controlSocket, ZMQ.Poller.POLLIN);
187 while (!Thread.currentThread().isInterrupted())
188 {
189
190 poller.poll();
191 if (poller.pollin(0))
192 {
193 byte[] data = resultInputQueue.recv();
194
195 byte[] fixedData = data;
196 int number = -1;
197 try
198 {
199
200 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
201 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
202 number = packetsSent.addAndGet(1);
203 fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
204 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
205
206
207 }
208 catch (Sim0MQException | SerializationException e)
209 {
210 e.printStackTrace();
211 }
212 resultOutputQueue.send(fixedData, 0);
213
214 continue;
215 }
216 if (poller.pollin(1))
217 {
218 byte[] data = controlSocket.recv();
219
220 if (!handleCommand(data, socketMap))
221 {
222 break;
223 }
224 }
225 }
226 System.out.println("Exiting publisher polling loop");
227 }
228
229
230
231
232
233
234
235
236
237 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
238 {
239 try
240 {
241 OTSAnimator animator = new OTSAnimator("OTS Animator");
242 this.network = new OTSRoadNetwork("OTS model for Sim0MQPublisher", true, animator);
243 this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, xml);
244 Map<String, StreamInterface> map = new LinkedHashMap<>();
245 map.put("generation", new MersenneTwister(seed));
246 animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
247 this.publisher = new Publisher(this.network);
248 this.animationPanel = new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
249 animator, this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
250 new OTSSimulationApplication<Sim0MQOTSModel>(this.model, this.animationPanel);
251 DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
252 JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
253 frame.setExtendedState(Frame.NORMAL);
254 frame.setSize(new Dimension(1100, 1000));
255 frame.setBounds(0, 25, 1100, 1000);
256 animator.setSpeedFactor(Double.MAX_VALUE, true);
257 animator.setSpeedFactor(1000.0, true);
258
259 ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap = this.model.getNetwork().getInvisibleObjectMap();
260 for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
261 {
262 if (ioi instanceof TrafCOD)
263 {
264 TrafCOD trafCOD = (TrafCOD) ioi;
265 Container controllerDisplayPanel = trafCOD.getDisplayContainer();
266 if (null != controllerDisplayPanel)
267 {
268 JPanel wrapper = new JPanel(new BorderLayout());
269 wrapper.add(new JScrollPane(controllerDisplayPanel));
270 TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
271 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
272 }
273 }
274 }
275 try
276 {
277 Thread.sleep(300);
278 }
279 catch (InterruptedException e)
280 {
281 e.printStackTrace();
282 }
283 this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
284 }
285 catch (Exception e)
286 {
287 return e.getMessage();
288 }
289 return null;
290 }
291
292
293
294
295
296
297
298 @SuppressWarnings("checkstyle:methodlength")
299 private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
300 {
301 boolean result = true;
302 try
303 {
304 Object[] message = Sim0MQMessage.decode(data).createObjectArray();
305 String resultMessage = "OK";
306 Boolean ackNack = null;
307
308 if (message.length >= 8 && message[5] instanceof String)
309 {
310 String command = (String) message[5];
311 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
312
313 String[] parts = command.split("\\|");
314 if (parts.length == 2)
315 {
316
317 ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
318 new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
319 socketMap);
320 if (null == this.publisher)
321 {
322 returnWrapper.nack("No simulation loaded; cannot execute command " + command);
323 System.err.println("No publisher for command " + command);
324 return true;
325 }
326 Object[] payload = Arrays.copyOfRange(message, 8, message.length);
327 this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
328 return true;
329 }
330 else
331 {
332 switch (command)
333 {
334 case "NEWSIMULATION":
335 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
336 && message[10] instanceof Duration && message[11] instanceof Long)
337 {
338 if (null != this.animationPanel)
339 {
340 for (Container container = this.animationPanel; container != null; container =
341 container.getParent())
342 {
343 if (container instanceof JFrame)
344 {
345 JFrame jFrame = (JFrame) container;
346 jFrame.dispose();
347 }
348 }
349 }
350
351 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
352 (Long) message[11]);
353 ackNack = null == resultMessage;
354 if (ackNack)
355 {
356 resultMessage = "OK";
357 }
358 }
359 else
360 {
361 resultMessage =
362 "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
363 ackNack = false;
364 }
365 break;
366
367 case "DIE":
368 for (Container container = this.animationPanel; container != null; container =
369 container.getParent())
370 {
371
372 if (container instanceof JFrame)
373 {
374 JFrame jFrame = (JFrame) container;
375 jFrame.dispose();
376 }
377 }
378 return false;
379
380 case "SIMULATEUNTIL":
381 if (message.length == 9 && message[8] instanceof Time)
382 {
383 System.out.println("Simulating up to " + message[8]);
384 if (null == this.network)
385 {
386 resultMessage = "No network loaded";
387 ackNack = false;
388 break;
389 }
390 OTSSimulatorInterface simulator = this.network.getSimulator();
391 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
392 {
393 resultMessage = "Simulation is already at end of simulation time";
394 ackNack = false;
395 break;
396 }
397 if (simulator.isStartingOrRunning())
398 {
399 resultMessage = "Simulator is already running";
400 ackNack = false;
401 break;
402 }
403 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
404 message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
405 returnWrapper.ack(resultMessage);
406 simulator.runUpTo((Time) message[8]);
407 int count = 0;
408 while (this.network.getSimulator().isStartingOrRunning())
409 {
410 System.out.print(".");
411 count++;
412 if (count > 1000)
413 {
414 System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
415 + this.network.getSimulator().getSimulatorTime());
416 this.network.getSimulator().stop();
417 Iterator<SimEventInterface<Duration>> elIt =
418 this.network.getSimulator().getEventList().iterator();
419 while (elIt.hasNext())
420 {
421 System.out.println("EVENTLIST: " + elIt.next());
422 }
423 }
424 try
425 {
426 Thread.sleep(10);
427 }
428 catch (InterruptedException e)
429 {
430 e.printStackTrace();
431 }
432 }
433
434 try
435 {
436 Thread.sleep(100);
437 }
438 catch (InterruptedException e)
439 {
440 e.printStackTrace();
441 }
442 return true;
443 }
444 else
445 {
446 resultMessage = "Bad or missing stop time";
447 ackNack = false;
448 }
449 break;
450
451 default:
452 IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
453 if (incomingDataHandler != null)
454 {
455 resultMessage = incomingDataHandler.handleIncomingData(message);
456 ackNack = resultMessage == null;
457 }
458 else
459 {
460 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
461 ackNack = false;
462 }
463 break;
464 }
465 }
466 if (resultMessage != null)
467 {
468 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
469 new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
470 socketMap);
471 if (ackNack)
472 {
473 returnWrapper.ack(resultMessage);
474 }
475 else
476 {
477 returnWrapper.nack(resultMessage);
478 }
479 }
480 }
481 else
482 {
483
484 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
485 System.err.println(HexDumper.hexDumper(data));
486 return true;
487 }
488 }
489 catch (Sim0MQException | SerializationException | RemoteException e)
490 {
491 System.err.println("Publisher thread could not decode command");
492 e.printStackTrace();
493 }
494 return result;
495 }
496
497 }
498
499
500
501
502 class Sim0MQOTSModel extends AbstractOTSModel
503 {
504
505 private static final long serialVersionUID = 20170419L;
506
507
508 private final OTSRoadNetwork network;
509
510
511 private final String xml;
512
513
514
515
516
517
518 Sim0MQOTSModel(final String description, final OTSRoadNetwork network, final String xml)
519 {
520 super(network.getSimulator(), network.getId(), description);
521 this.network = network;
522 this.xml = xml;
523 }
524
525
526 @Override
527 public void constructModel() throws SimRuntimeException
528 {
529 try
530 {
531 XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
532 false);
533 ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
534 getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)),
535 new LaneCombinationList(), new LaneCombinationList());
536 }
537 catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
538 | ParserConfigurationException | GTUException | IOException | TrafficControlException exception)
539 {
540 exception.printStackTrace();
541
542
543 throw new SimRuntimeException(exception);
544 }
545 }
546
547
548 @Override
549 public OTSNetwork getNetwork()
550 {
551 return this.network;
552 }
553
554
555 @Override
556 public Serializable getSourceId()
557 {
558 return "Sim0MQPublisherModel";
559 }
560
561 }