1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.awt.BorderLayout;
4 import java.awt.Container;
5 import java.awt.Dimension;
6 import java.awt.Frame;
7 import java.awt.event.ActionEvent;
8 import java.io.ByteArrayInputStream;
9 import java.io.IOException;
10 import java.io.Serializable;
11 import java.net.URISyntaxException;
12 import java.nio.charset.StandardCharsets;
13 import java.rmi.RemoteException;
14 import java.util.Arrays;
15 import java.util.HashMap;
16 import java.util.Iterator;
17 import java.util.LinkedHashMap;
18 import java.util.Map;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import javax.swing.JFrame;
22 import javax.swing.JPanel;
23 import javax.swing.JScrollPane;
24 import javax.xml.bind.JAXBException;
25 import javax.xml.parsers.ParserConfigurationException;
26
27 import org.djunits.value.vdouble.scalar.Duration;
28 import org.djunits.value.vdouble.scalar.Length;
29 import org.djunits.value.vdouble.scalar.Time;
30 import org.djutils.decoderdumper.HexDumper;
31 import org.djutils.immutablecollections.ImmutableMap;
32 import org.djutils.serialization.SerializationException;
33 import org.opentrafficsim.core.animation.gtu.colorer.DefaultSwitchableGTUColorer;
34 import org.opentrafficsim.core.dsol.AbstractOTSModel;
35 import org.opentrafficsim.core.dsol.OTSAnimator;
36 import org.opentrafficsim.core.dsol.OTSSimulatorInterface;
37 import org.opentrafficsim.core.geometry.OTSGeometryException;
38 import org.opentrafficsim.core.gtu.GTUException;
39 import org.opentrafficsim.core.gtu.GTUType;
40 import org.opentrafficsim.core.network.NetworkException;
41 import org.opentrafficsim.core.network.OTSNetwork;
42 import org.opentrafficsim.core.object.InvisibleObjectInterface;
43 import org.opentrafficsim.draw.factory.DefaultAnimationFactory;
44 import org.opentrafficsim.road.network.OTSRoadNetwork;
45 import org.opentrafficsim.road.network.factory.xml.XmlParserException;
46 import org.opentrafficsim.road.network.factory.xml.parser.XmlNetworkLaneParser;
47 import org.opentrafficsim.road.network.lane.conflict.ConflictBuilder;
48 import org.opentrafficsim.road.network.lane.conflict.LaneCombinationList;
49 import org.opentrafficsim.sim0mq.publisher.Publisher;
50 import org.opentrafficsim.sim0mq.publisher.ReturnWrapper;
51 import org.opentrafficsim.sim0mq.publisher.ReturnWrapperImpl;
52 import org.opentrafficsim.swing.gui.OTSAnimationPanel;
53 import org.opentrafficsim.swing.gui.OTSSimulationApplication;
54 import org.opentrafficsim.swing.gui.OTSSwingApplication;
55 import org.opentrafficsim.trafficcontrol.TrafficControlException;
56 import org.opentrafficsim.trafficcontrol.trafcod.TrafCOD;
57 import org.sim0mq.Sim0MQException;
58 import org.sim0mq.message.Sim0MQMessage;
59 import org.xml.sax.SAXException;
60 import org.zeromq.SocketType;
61 import org.zeromq.ZContext;
62 import org.zeromq.ZMQ;
63
64 import nl.tudelft.simulation.dsol.SimRuntimeException;
65 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
66 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
67 import nl.tudelft.simulation.dsol.swing.gui.TabbedContentPane;
68 import nl.tudelft.simulation.jstats.streams.MersenneTwister;
69 import nl.tudelft.simulation.jstats.streams.StreamInterface;
70
71
72
73
74
75
76
77
78
79
80 public final class Sim0MQPublisher
81 {
82
83 private Publisher publisher = null;
84
85
86 private final ZContext zContext;
87
88
89 private Sim0MQOTSModel model = null;
90
91
92 private OTSRoadNetwork network = null;
93
94
95 private OTSAnimationPanel animationPanel = null;
96
97
98
99
100
101
102
103 public Sim0MQPublisher(final ZContext zContext, final String controlInput, final String resultOutput)
104 {
105 this.zContext = zContext;
106 ZMQ.Socket controlSocket = zContext.createSocket(SocketType.PULL);
107 controlSocket.bind("inproc://" + controlInput);
108 ZMQ.Socket resultOutputQueue = zContext.createSocket(SocketType.PUSH);
109 resultOutputQueue.connect("inproc://" + resultOutput);
110 pollingLoop(controlSocket, resultOutputQueue);
111 }
112
113
114
115
116
117 public Sim0MQPublisher(final int port)
118 {
119 this.zContext = new ZContext(5);
120 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PAIR);
121 socket.bind("tcp://*:" + port);
122 pollingLoop(socket, socket);
123 }
124
125
126
127
128
129
130 private void pollingLoop(final ZMQ.Socket controlSocket, final ZMQ.Socket resultOutputQueue)
131 {
132 System.out
133 .println("Publisher communication relay and simulation control thread id is " + Thread.currentThread().getId());
134 resultOutputQueue.setHWM(100000);
135 AtomicInteger packetsSent = new AtomicInteger(0);
136 Map<Long, ZMQ.Socket> socketMap = new HashMap<>();
137 ZMQ.Socket resultInputQueue = this.zContext.createSocket(SocketType.PULL);
138 resultInputQueue.bind("inproc://simulationEvents");
139
140 ZMQ.Poller poller = this.zContext.createPoller(2);
141
142 poller.register(resultInputQueue, ZMQ.Poller.POLLIN);
143 poller.register(controlSocket, ZMQ.Poller.POLLIN);
144 while (!Thread.currentThread().isInterrupted())
145 {
146
147 poller.poll();
148 if (poller.pollin(0))
149 {
150 byte[] data = resultInputQueue.recv();
151
152 byte[] fixedData = data;
153 int number = -1;
154 try
155 {
156
157 Object[] messageFields = Sim0MQMessage.decode(data).createObjectArray();
158 Object[] newMessageFields = Arrays.copyOfRange(messageFields, 8, messageFields.length);
159 number = packetsSent.addAndGet(1);
160 fixedData = Sim0MQMessage.encodeUTF8(true, messageFields[2], String.format("slave_%05d", number),
161 messageFields[4], messageFields[5], messageFields[6], newMessageFields);
162
163
164 }
165 catch (Sim0MQException | SerializationException e)
166 {
167 e.printStackTrace();
168 }
169 resultOutputQueue.send(fixedData, 0);
170
171 continue;
172 }
173 if (poller.pollin(1))
174 {
175 byte[] data = controlSocket.recv();
176
177 if (!handleCommand(data, socketMap))
178 {
179 break;
180 }
181 }
182 }
183 System.out.println("Exiting publisher polling loop");
184 }
185
186
187
188
189
190
191
192
193
194 private String loadNetwork(final String xml, final Duration simulationDuration, final Duration warmupTime, final Long seed)
195 {
196 try
197 {
198 OTSAnimator animator = new OTSAnimator("OTS Animator");
199 this.network = new OTSRoadNetwork("OTS model for Sim0MQPublisher", true, animator);
200 this.model = new Sim0MQOTSModel("Remotely controlled OTS model", this.network, xml);
201 Map<String, StreamInterface> map = new LinkedHashMap<>();
202 map.put("generation", new MersenneTwister(seed));
203 animator.initialize(Time.ZERO, warmupTime, simulationDuration, this.model, map);
204 this.publisher = new Publisher(this.network);
205 this.animationPanel = new OTSAnimationPanel(this.model.getNetwork().getExtent(), new Dimension(1100, 1000),
206 animator, this.model, OTSSwingApplication.DEFAULT_COLORER, this.model.getNetwork());
207 new OTSSimulationApplication<Sim0MQOTSModel>(this.model, this.animationPanel);
208 DefaultAnimationFactory.animateXmlNetwork(this.model.getNetwork(), new DefaultSwitchableGTUColorer());
209 JFrame frame = (JFrame) this.animationPanel.getParent().getParent().getParent();
210 frame.setExtendedState(Frame.NORMAL);
211 frame.setSize(new Dimension(1100, 1000));
212 frame.setBounds(0, 25, 1100, 1000);
213 animator.setSpeedFactor(Double.MAX_VALUE, true);
214 animator.setSpeedFactor(1000.0, true);
215
216 ImmutableMap<String, InvisibleObjectInterface> invisibleObjectMap = this.model.getNetwork().getInvisibleObjectMap();
217 for (InvisibleObjectInterface ioi : invisibleObjectMap.values())
218 {
219 if (ioi instanceof TrafCOD)
220 {
221 TrafCOD trafCOD = (TrafCOD) ioi;
222 Container controllerDisplayPanel = trafCOD.getDisplayContainer();
223 if (null != controllerDisplayPanel)
224 {
225 JPanel wrapper = new JPanel(new BorderLayout());
226 wrapper.add(new JScrollPane(controllerDisplayPanel));
227 TabbedContentPane tabbedPane = this.animationPanel.getTabbedPane();
228 tabbedPane.addTab(tabbedPane.getTabCount() - 1, trafCOD.getId(), wrapper);
229 }
230 }
231 }
232 try
233 {
234 Thread.sleep(300);
235 }
236 catch (InterruptedException e)
237 {
238 e.printStackTrace();
239 }
240 this.animationPanel.actionPerformed(new ActionEvent(this, 0, "ZoomAll"));
241 }
242 catch (Exception e)
243 {
244 return e.getMessage();
245 }
246 return null;
247 }
248
249
250
251
252
253
254
255 private boolean handleCommand(final byte[] data, final Map<Long, ZMQ.Socket> socketMap)
256 {
257 boolean result = true;
258 try
259 {
260 Object[] message = Sim0MQMessage.decode(data).createObjectArray();
261 String resultMessage = "OK";
262 Boolean ackNack = null;
263
264 if (message.length >= 8 && message[5] instanceof String)
265 {
266 String command = (String) message[5];
267 System.out.println("Publisher thread decoded Sim0MQ command: " + command);
268
269 String[] parts = command.split("\\|");
270 if (parts.length == 2)
271 {
272
273 ReturnWrapperImpl returnWrapper = new ReturnWrapperImpl(this.zContext,
274 new Object[] {"SIM01", true, message[2], message[3], message[4], parts[0], message[6], 0},
275 socketMap);
276 if (null == this.publisher)
277 {
278 returnWrapper.nack("No simulation loaded; cannot execute command " + command);
279 System.err.println("No publisher for command " + command);
280 return true;
281 }
282 Object[] payload = Arrays.copyOfRange(message, 8, message.length);
283 this.publisher.executeCommand(parts[0], parts[1], payload, returnWrapper);
284 return true;
285 }
286 else
287 {
288 switch (command)
289 {
290 case "NEWSIMULATION":
291 if (message.length == 12 && message[8] instanceof String && message[9] instanceof Duration
292 && message[10] instanceof Duration && message[11] instanceof Long)
293 {
294 if (null != this.animationPanel)
295 {
296 for (Container container = this.animationPanel; container != null; container =
297 container.getParent())
298 {
299 if (container instanceof JFrame)
300 {
301 JFrame jFrame = (JFrame) container;
302 jFrame.dispose();
303 }
304 }
305 }
306
307 resultMessage = loadNetwork((String) message[8], (Duration) message[9], (Duration) message[10],
308 (Long) message[11]);
309 ackNack = null == resultMessage;
310 if (ackNack)
311 {
312 resultMessage = "OK";
313 }
314 }
315 else
316 {
317 resultMessage =
318 "No network, warmupTime and/or runTime, or seed provided with NEWSIMULATION command";
319 ackNack = false;
320 }
321 break;
322
323 case "DIE":
324 for (Container container = this.animationPanel; container != null; container =
325 container.getParent())
326 {
327
328 if (container instanceof JFrame)
329 {
330 JFrame jFrame = (JFrame) container;
331 jFrame.dispose();
332 }
333 }
334 return false;
335
336 case "SIMULATEUNTIL":
337 if (message.length == 9 && message[8] instanceof Time)
338 {
339 System.out.println("Simulating up to " + message[8]);
340 if (null == this.network)
341 {
342 resultMessage = "No network loaded";
343 ackNack = false;
344 break;
345 }
346 OTSSimulatorInterface simulator = this.network.getSimulator();
347 if (simulator.getSimulatorTime()
348 .ge(simulator.getReplication().getExperiment().getTreatment().getEndTime()))
349 {
350 resultMessage = "Simulation is already at end of simulation time";
351 ackNack = false;
352 break;
353 }
354 if (simulator.isStartingOrRunning())
355 {
356 resultMessage = "Simulator is already running";
357 ackNack = false;
358 break;
359 }
360 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext, new Object[] {"SIM01", true,
361 message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
362 returnWrapper.ack(resultMessage);
363 simulator.runUpTo((Time) message[8]);
364 int count = 0;
365 while (this.network.getSimulator().isStartingOrRunning())
366 {
367 System.out.print(".");
368 count++;
369 if (count > 1000)
370 {
371 System.out.println("TIMEOUT - STOPPING SIMULATOR. TIME = "
372 + this.network.getSimulator().getSimulatorTime());
373 this.network.getSimulator().stop();
374 Iterator<SimEventInterface<SimTimeDoubleUnit>> elIt =
375 this.network.getSimulator().getEventList().iterator();
376 while (elIt.hasNext())
377 {
378 System.out.println("EVENTLIST: " + elIt.next());
379 }
380 }
381 try
382 {
383 Thread.sleep(10);
384 }
385 catch (InterruptedException e)
386 {
387 e.printStackTrace();
388 }
389 }
390
391 try
392 {
393 Thread.sleep(100);
394 }
395 catch (InterruptedException e)
396 {
397 e.printStackTrace();
398 }
399 return true;
400 }
401 else
402 {
403 resultMessage = "Bad or missing stop time";
404 ackNack = false;
405 }
406 break;
407
408 default:
409 resultMessage = "Don't know how to handle message:\n" + Sim0MQMessage.print(message);
410 ackNack = false;
411 break;
412 }
413 }
414 ReturnWrapper returnWrapper = new ReturnWrapperImpl(this.zContext,
415 new Object[] {"SIM01", true, message[2], message[3], message[4], message[5], message[6], 0}, socketMap);
416 if (ackNack)
417 {
418 returnWrapper.ack(resultMessage);
419 }
420 else
421 {
422 returnWrapper.nack(resultMessage);
423 }
424 }
425 else
426 {
427
428 System.err.println("Publisher expected Sim0MQ command but is has too few fields:");
429 System.err.println(HexDumper.hexDumper(data));
430 return true;
431 }
432 }
433 catch (Sim0MQException | SerializationException | RemoteException e)
434 {
435 System.err.println("Publisher thread could not decode command");
436 e.printStackTrace();
437 }
438 return result;
439 }
440
441 }
442
443
444
445
446 class Sim0MQOTSModel extends AbstractOTSModel
447 {
448
449 private static final long serialVersionUID = 20170419L;
450
451
452 private final OTSRoadNetwork network;
453
454
455 private final String xml;
456
457
458
459
460
461
462 Sim0MQOTSModel(final String description, final OTSRoadNetwork network, final String xml)
463 {
464 super(network.getSimulator(), network.getId(), description);
465 this.network = network;
466 this.xml = xml;
467 }
468
469
470 @Override
471 public void constructModel() throws SimRuntimeException
472 {
473 try
474 {
475 XmlNetworkLaneParser.build(new ByteArrayInputStream(this.xml.getBytes(StandardCharsets.UTF_8)), this.network,
476 false);
477 ConflictBuilder.buildConflictsParallel(this.network, this.network.getGtuType(GTUType.DEFAULTS.VEHICLE),
478 getSimulator(), new ConflictBuilder.FixedWidthGenerator(Length.instantiateSI(2.0)),
479 new LaneCombinationList(), new LaneCombinationList());
480 }
481 catch (NetworkException | OTSGeometryException | JAXBException | URISyntaxException | XmlParserException | SAXException
482 | ParserConfigurationException | GTUException | IOException | TrafficControlException exception)
483 {
484 exception.printStackTrace();
485
486
487 throw new SimRuntimeException(exception);
488 }
489 }
490
491
492 @Override
493 public OTSNetwork getNetwork()
494 {
495 return this.network;
496 }
497
498
499 @Override
500 public Serializable getSourceId()
501 {
502 return "Sim0MQPublisherModel";
503 }
504
505 }