1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.io.Serializable;
11 import java.net.URISyntaxException;
12 import java.nio.charset.StandardCharsets;
13 import java.rmi.RemoteException;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.Iterator;
17 import java.util.LinkedHashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.atomic.AtomicInteger;
21
22 import javax.swing.JFrame;
23 import javax.swing.JPanel;
24 import javax.swing.JScrollPane;
25 import javax.xml.bind.JAXBException;
26 import javax.xml.parsers.ParserConfigurationException;
27
28 import org.djunits.value.vdouble.scalar.Duration;
29 import org.djunits.value.vdouble.scalar.Length;
30 import org.djunits.value.vdouble.scalar.Time;
31 import org.djutils.decoderdumper.HexDumper;
32 import org.djutils.immutablecollections.ImmutableMap;
33 import org.djutils.serialization.SerializationException;
34 import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
35 import org.opentrafficsim.core.dsol.AbstractOTSModel;
36 import org.opentrafficsim.core.dsol.OTSAnimator;
37 import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
38 import org.opentrafficsim.core.geometry.OTSGeometryException;
39 import org.opentrafficsim.core.gtu.GTUException;
40 import org.opentrafficsim.core.gtu.GTUType;
41 import org.opentrafficsim.core.network.NetworkException;
42 import org.opentrafficsim.core.network.OTSNetwork;
43 import org.opentrafficsim.core.object.InvisibleObjectInterface;
44 import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
45 import org.opentrafficsim.road.network.OTSRoadNetwork;
46 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
47 import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
48 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
49 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
50 import org.opentrafficsim.sim0mq.publisher.IncomingDataHandler;
51 import org.opentrafficsim.sim0mq.publisher.Publisher;
52 import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
53 import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
54 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
55 import org.opentrafficsim.swing.gui.OTSAnimationPanel;
56 import org.opentrafficsim.swing.gui.OTSSimulationApplication;
57 import org.opentrafficsim.swing.gui.OTSSwingApplication;
58 import org.opentrafficsim.swing.script.AbstractSimulationScript;
59 import org.opentrafficsim.trafficcontrol.TrafficControlException;
60 import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
61 import org.sim0mq.Sim0MQException;
62 import org.sim0mq.message.Sim0MQMessage;
63 import org.xml.sax.SAXException;
64 import org.zeromq.SocketType;
65 import org.zeromq.ZContext;
66 import org.zeromq.ZMQ;
67
68 import nl.tudelft.simulation.dsol.SimRuntimeException;
69 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
70 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
71 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
72 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
73 import nl.tudelft.simulation.jstats.streams.StreamInterface;
74
75
76
77
78
79
80
81
82
83
84 public final class Sim0MQPublisher
85 {
86
87 private Publisher publisher = null;
88
89
90 private final ZContext zContext;
91
92
93 private Sim0MQOTSModel model = null;
94
95
96 private OTSRoadNetwork network = null;
97
98
99 private OTSAnimationPanel animationPanel = null;
100
101
102
103
104
105
106
107 public Sim0MQPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
108 {
109 this.zContext = zContext;
110 ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
111 controlSocket.bind("inproc://" + controlInput);
112 ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
113 resultOutputQueue.connect("inproc://" + resultOutput);
114 pollingLoop(controlSocket, resultOutputQueue);
115 }
116
117
118
119
120
121 public Sim0MQPublisher(final int port)
122 {
123 this.zContext = new ZContext(5);
124 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
125 socket.bind("tcp://*:" + port);
126 pollingLoop(socket, socket);
127 }
128
129
130
131
132
133
134
135
136
137
138
139 public Sim0MQPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
140 final List<SubscriptionHandler> additionalSubscriptionHandlers,
141 final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
142 {
143 this.zContext = new ZContext(5);
144 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
145 socket.bind("tcp://*:" + port);
146 this.network = preloadedSimulation.getNetwork();
147 this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, null);
148 this.publisher = new Publisher(this.network, additionalSubscriptionHandlers, incomingDataHandlers);
149 ((OTSAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(Double.MAX_VALUE, true);
150 ((OTSAnimator) preloadedSimulation.getSimulator()).setSpeedFactor(1000.0, true);
151 pollingLoop(socket, socket);
152 System.exit(0);
153 }
154
155
156
157
158
159
160
161
162
163 public Sim0MQPublisher(final int port, final AbstractSimulationScript preloadedSimulation,
164 final List<SubscriptionHandler> additionalSubscriptionHandlers) throws RemoteException
165 {
166 this(port, preloadedSimulation, additionalSubscriptionHandlers, null);
167 }
168
169
170
171
172
173
174 private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
175 {
176 System.out
177 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
178 resultOutputQueue.setHWM(100000);
179 AtomicInteger packetsSent = new AtomicInteger(0);
180 Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
181 ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
182 resultInputQueue.bind("inproc://simulationEvents");
183
184 ZMQ.Poller poller = this.zContext.createPoller(2);
185
186 poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
187 poller.register(controlSocket, ZMQ.Poller.POLLIN);
188 while (!Thread.currentThread().isInterrupted())
189 {
190
191 poller.poll();
192 if (poller.pollin(0))
193 {
194 byte[] data = resultInputQueue.recv();
195
196 byte[] fixedData = data;
197 int number = -1;
198 try
199 {
200
201 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
202 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
203 number = packetsSent.addAndGet(1);
204 fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
205 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
206
207
208 }
209 catch (Sim0MQException | SerializationException e)
210 {
211 e.printStackTrace();
212 }
213 resultOutputQueue.send(fixedData, 0);
214
215 continue;
216 }
217 if (poller.pollin(1))
218 {
219 byte[] data = controlSocket.recv();
220
221 if (!handleCommand(data, socketMap))
222 {
223 break;
224 }
225 }
226 }
227 System.out.println("Exiting publisher polling loop");
228 }
229
230
231
232
233
234
235
236
237
238 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
239 {
240 try
241 {
242 OTSAnimator animator = new OTSAnimator("OTS Animator");
243 this.network = new OTSRoadNetwork("OTS model for Sim0MQPublisher", true, animator);
244 this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, xml);
245 Map<String, StreamInterface> map = new LinkedHashMap<>();
246 map.put("generation", new MersenneTwister(seed));
247 animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
248 this.publisher = new Publisher(this.network);
249 this.animationPanel = new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
250 animator, this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
251 new OTSSimulationApplication<Sim0MQOTSModel>(this.model, this.animationPanel);
252 DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
253 JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
254 frame.setExtendedState(Frame.NORMAL);
255 frame.setSize(new Dimension(1100, 1000));
256 frame.setBounds(0, 25, 1100, 1000);
257 animator.setSpeedFactor(Double.MAX_VALUE, true);
258 animator.setSpeedFactor(1000.0, true);
259
260 ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap = this.model.getNetwork().getInvisibleObjectMap();
261 for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
262 {
263 if (ioi instanceof TrafCOD)
264 {
265 TrafCOD trafCOD = (TrafCOD) ioi;
266 Container controllerDisplayPanel = trafCOD.getDisplayContainer();
267 if (null != controllerDisplayPanel)
268 {
269 JPanel wrapper = new JPanel(new BorderLayout());
270 wrapper.add(new JScrollPane(controllerDisplayPanel));
271 TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
272 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
273 }
274 }
275 }
276 try
277 {
278 Thread.sleep(300);
279 }
280 catch (InterruptedException e)
281 {
282 e.printStackTrace();
283 }
284 this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
285 }
286 catch (Exception e)
287 {
288 return e.getMessage();
289 }
290 return null;
291 }
292
293
294
295
296
297
298
299 @SuppressWarnings("checkstyle:methodlength")
300 private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
301 {
302 boolean result = true;
303 try
304 {
305 Object[] message = Sim0MQMessage.decode(data).createObjectArray();
306 String resultMessage = "OK";
307 Boolean ackNack = null;
308
309 if (message.length >= 8 && message[5] instanceof String)
310 {
311 String command = (String) message[5];
312 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
313
314 String[] parts = command.split("\\|");
315 if (parts.length == 2)
316 {
317
318 ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
319 new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
320 socketMap);
321 if (null == this.publisher)
322 {
323 returnWrapper.nack("No simulation loaded; cannot execute command " + command);
324 System.err.println("No publisher for command " + command);
325 return true;
326 }
327 Object[] payload = Arrays.copyOfRange(message, 8, message.length);
328 this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
329 return true;
330 }
331 else
332 {
333 switch (command)
334 {
335 case "NEWSIMULATION":
336 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
337 && message[10] instanceof Duration && message[11] instanceof Long)
338 {
339 if (null != this.animationPanel)
340 {
341 for (Container container = this.animationPanel; container != null; container =
342 container.getParent())
343 {
344 if (container instanceof JFrame)
345 {
346 JFrame jFrame = (JFrame) container;
347 jFrame.dispose();
348 }
349 }
350 }
351
352 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
353 (Long) message[11]);
354 ackNack = null == resultMessage;
355 if (ackNack)
356 {
357 resultMessage = "OK";
358 }
359 }
360 else
361 {
362 resultMessage =
363 "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
364 ackNack = false;
365 }
366 break;
367
368 case "DIE":
369 for (Container container = this.animationPanel; container != null; container =
370 container.getParent())
371 {
372
373 if (container instanceof JFrame)
374 {
375 JFrame jFrame = (JFrame) container;
376 jFrame.dispose();
377 }
378 }
379 return false;
380
381 case "SIMULATEUNTIL":
382 if (message.length == 9 && message[8] instanceof Time)
383 {
384 System.out.println("Simulating up to " + message[8]);
385 if (null == this.network)
386 {
387 resultMessage = "No network loaded";
388 ackNack = false;
389 break;
390 }
391 OTSSimulatorInterface simulator = this.network.getSimulator();
392 if (simulator.getSimulatorTime().ge(simulator.getReplication().getEndTime()))
393 {
394 resultMessage = "Simulation is already at end of simulation time";
395 ackNack = false;
396 break;
397 }
398 if (simulator.isStartingOrRunning())
399 {
400 resultMessage = "Simulator is already running";
401 ackNack = false;
402 break;
403 }
404 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
405 message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
406 returnWrapper.ack(resultMessage);
407 simulator.runUpTo(new SimTimeDoubleUnit((Time) message[8]));
408 int count = 0;
409 while (this.network.getSimulator().isStartingOrRunning())
410 {
411 System.out.print(".");
412 count++;
413 if (count > 1000)
414 {
415 System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
416 + this.network.getSimulator().getSimulatorTime());
417 this.network.getSimulator().stop();
418 Iterator<SimEventInterface<SimTimeDoubleUnit>> elIt =
419 this.network.getSimulator().getEventList().iterator();
420 while (elIt.hasNext())
421 {
422 System.out.println("EVENTLIST: " + elIt.next());
423 }
424 }
425 try
426 {
427 Thread.sleep(10);
428 }
429 catch (InterruptedException e)
430 {
431 e.printStackTrace();
432 }
433 }
434
435 try
436 {
437 Thread.sleep(100);
438 }
439 catch (InterruptedException e)
440 {
441 e.printStackTrace();
442 }
443 return true;
444 }
445 else
446 {
447 resultMessage = "Bad or missing stop time";
448 ackNack = false;
449 }
450 break;
451
452 default:
453 IncomingDataHandler incomingDataHandler = this.publisher.lookupIncomingDataHandler(command);
454 if (incomingDataHandler != null)
455 {
456 resultMessage = incomingDataHandler.handleIncomingData(message);
457 ackNack = resultMessage == null;
458 }
459 else
460 {
461 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
462 ackNack = false;
463 }
464 break;
465 }
466 }
467 if (resultMessage != null)
468 {
469 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
470 new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0},
471 socketMap);
472 if (ackNack)
473 {
474 returnWrapper.ack(resultMessage);
475 }
476 else
477 {
478 returnWrapper.nack(resultMessage);
479 }
480 }
481 }
482 else
483 {
484
485 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
486 System.err.println(HexDumper.hexDumper(data));
487 return true;
488 }
489 }
490 catch (Sim0MQException | SerializationException | RemoteException e)
491 {
492 System.err.println("Publisher thread could not decode command");
493 e.printStackTrace();
494 }
495 return result;
496 }
497
498 }
499
500
501
502
503 class Sim0MQOTSModel extends AbstractOTSModel
504 {
505
506 private static final long serialVersionUID = 20170419L;
507
508
509 private final OTSRoadNetwork network;
510
511
512 private final String xml;
513
514
515
516
517
518
519 Sim0MQOTSModel(final String description, final OTSRoadNetwork network, final String xml)
520 {
521 super(network.getSimulator(), network.getId(), description);
522 this.network = network;
523 this.xml = xml;
524 }
525
526
527 @Override
528 public void constructModel() throws SimRuntimeException
529 {
530 try
531 {
532 XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
533 false);
534 ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
535 getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)),
536 new LaneCombinationList(), new LaneCombinationList());
537 }
538 catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
539 | ParserConfigurationException | GTUException | IOException | TrafficControlException exception)
540 {
541 exception.printStackTrace();
542
543
544 throw new SimRuntimeException(exception);
545 }
546 }
547
548
549 @Override
550 public OTSNetwork getNetwork()
551 {
552 return this.network;
553 }
554
555
556 @Override
557 public Serializable getSourceId()
558 {
559 return "Sim0MQPublisherModel";
560 }
561
562 }