1 package org.opentrafficsim.core.dsol;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7
8 import org.djunits.unit.DurationUnit;
9 import org.djunits.value.vdouble.scalar.Duration;
10 import org.djunits.value.vdouble.scalar.Time;
11 import org.opentrafficsim.core.gtu.GTU;
12
13 import nl.tudelft.simulation.dsol.SimRuntimeException;
14 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16 import nl.tudelft.simulation.dsol.logger.SimLogger;
17 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
18 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
19 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
20
21 /**
22 * <p>
23 * Copyright (c) 2013-2019 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
24 * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
25 * <p>
26 * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
27 * initial version Aug 15, 2014 <br>
28 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
29 */
30 public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
31 {
32 /** */
33 private static final long serialVersionUID = 20140909L;
34
35 /** number of threads to use for move(). */
36 private int moveThreads = 1;
37
38 /** the thread pool for parallel execution. */
39 private ExecutorService executor = null;
40
41 /**
42 * Create a new OTSRealTimeClock.
43 * @param moveThreads int; The number of move threads to use
44 */
45 public OTSDEVSRTParallelMove(final int moveThreads)
46 {
47 super();
48 setMoveThreads(moveThreads);
49 setEventList(new SynchronizedRedBlackTree<>());
50 }
51
52 /**
53 * Create a new OTSRealTimeClock.
54 */
55 public OTSDEVSRTParallelMove()
56 {
57 this(1);
58 }
59
60 /**
61 * @param moveThreads int; set moveThreads
62 */
63 public final void setMoveThreads(final int moveThreads)
64 {
65 this.moveThreads = moveThreads;
66 }
67
68 /**
69 * @return moveThreads
70 */
71 public final int getMoveThreads()
72 {
73 return this.moveThreads;
74 }
75
76 /** {@inheritDoc} */
77 @Override
78 protected final Duration simulatorTimeForWallClockMillis(final double factor)
79 {
80 return new Duration(factor, DurationUnit.MILLISECOND);
81 }
82
83 /** {@inheritDoc} */
84 @Override
85 public final String toString()
86 {
87 return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
88 }
89
90 // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
91 /** {@inheritDoc} */
92 @Override
93 @SuppressWarnings("checkstyle:designforextension")
94 public void run()
95 {
96 AnimationThread animationThread = new AnimationThread(this);
97 animationThread.start();
98
99 long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
100 SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
101 double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
102 double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
103 Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
104
105 while (this.isRunning() && !this.eventList.isEmpty()
106 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
107 {
108 // check if speedFactor has changed. If yes: re-baseline.
109 if (factor != getSpeedFactor())
110 {
111 clockTime0 = System.currentTimeMillis();
112 simTime0 = this.simulatorTime;
113 factor = getSpeedFactor();
114 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
115 }
116
117 // check if we are behind; syncTime is the needed current time on the wall-clock
118 double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
119 // delta is the time we might be behind
120 double simTime = this.simulatorTime.diff(simTime0).doubleValue();
121
122 if (syncTime > simTime)
123 {
124 // we are behind
125 if (!isCatchup())
126 {
127 // if no catch-up: re-baseline.
128 clockTime0 = System.currentTimeMillis();
129 simTime0 = this.simulatorTime;
130 }
131 else
132 {
133 // jump to the required wall-clock related time or to the time of the next event, whichever comes
134 // first
135 synchronized (super.semaphore)
136 {
137 Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
138 SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
139 SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
140 if (absSyncTime.lt(eventTime))
141 {
142 this.simulatorTime = absSyncTime;
143 }
144 else
145 {
146 this.simulatorTime = eventTime;
147 }
148 }
149 }
150 }
151
152 // peek at the first event and determine the time difference relative to RT speed; that determines
153 // how long we have to wait.
154 SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
155 double simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
156
157 /*
158 * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
159 * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
160 * have to wait 10 times that amount. We might also be behind.
161 */
162 if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
163 {
164 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
165 {
166 try
167 {
168 Thread.sleep(getUpdateMsec());
169
170 // check if speedFactor has changed. If yes: break out of this loop and execute event.
171 // this could cause a jump.
172 if (factor != getSpeedFactor())
173 {
174 simTimeDiffMillis = 0.0;
175 }
176
177 }
178 catch (InterruptedException ie)
179 {
180 // do nothing
181 ie = null;
182 }
183
184 // check if an event has been inserted. In a real-time situation this can be dome by other threads
185 if (!event.equals(this.eventList.first())) // event inserted by a thread...
186 {
187 event = this.eventList.first();
188 simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
189 }
190 else
191 {
192 // make a small time step for the animation during wallclock waiting.
193 // but never beyond the next event time.
194 if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
195 {
196 synchronized (super.semaphore)
197 {
198 this.simulatorTime.add(rSim);
199 }
200 }
201 }
202 }
203 }
204
205 this.simulatorTime = event.getAbsoluteExecutionTime();
206 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
207
208 if (this.moveThreads <= 1)
209 {
210 synchronized (super.semaphore)
211 {
212 // carry out all events scheduled on this simulation time, as long as we are still running.
213 while (this.isRunning() && !this.eventList.isEmpty()
214 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
215 {
216 event = this.eventList.removeFirst();
217 try
218 {
219 event.execute();
220 }
221 catch (Exception exception)
222 {
223 SimLogger.always().error(exception);
224 if (this.isPauseOnError())
225 {
226 try
227 {
228 this.stop();
229 }
230 catch (SimRuntimeException exception1)
231 {
232 SimLogger.always().error(exception1);
233 }
234 }
235 }
236 if (!this.eventList.isEmpty())
237 {
238 // peek at next event for while loop.
239 event = this.eventList.first();
240 }
241 }
242 }
243 }
244
245 else
246
247 {
248 // parallel execution of the move method
249 // first carry out all the non-move events and make a list of move events to be carried out in parallel
250 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
251 synchronized (super.semaphore)
252 {
253 while (this.isRunning() && !this.eventList.isEmpty()
254 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
255 {
256 event = this.eventList.removeFirst();
257 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
258 if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
259 {
260 moveEvents.add(event);
261 }
262 else
263 {
264 try
265 {
266 event.execute();
267 }
268 catch (Exception exception)
269 {
270 SimLogger.always().error(exception);
271 if (this.isPauseOnError())
272 {
273 try
274 {
275 this.stop();
276 }
277 catch (SimRuntimeException exception1)
278 {
279 SimLogger.always().error(exception1);
280 }
281 }
282 }
283 }
284 if (!this.eventList.isEmpty())
285 {
286 // peek at next event for while loop.
287 event = this.eventList.first();
288 }
289 }
290 }
291
292 // then carry out the move events, based on a constant state at that time.
293 // first make sure that new events will be stored in a temporary event list...
294 this.executor = Executors.newFixedThreadPool(1);
295 for (int i = 0; i < moveEvents.size(); i++)
296 {
297 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
298 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
299 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
300 this.executor.execute(new Runnable()
301 {
302 @Override
303 public void run()
304 {
305 try
306 {
307 moveEvent.execute();
308 }
309 catch (Exception exception)
310 {
311 SimLogger.always().error(exception);
312 if (OTSDEVSRTParallelMove.this.isPauseOnError())
313 {
314 try
315 {
316 OTSDEVSRTParallelMove.this.stop();
317 }
318 catch (SimRuntimeException exception1)
319 {
320 SimLogger.always().error(exception1);
321 }
322 }
323 }
324 }
325 });
326 }
327 this.executor.shutdown();
328 try
329 {
330 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
331 }
332 catch (InterruptedException exception)
333 {
334 //
335 }
336
337 this.executor = Executors.newFixedThreadPool(1);
338 for (int i = 0; i < moveEvents.size(); i++)
339 {
340 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
341 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
342 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
343 this.executor.execute(new Runnable()
344 {
345 @Override
346 public void run()
347 {
348 try
349 {
350 moveEvent.execute();
351 }
352 catch (Exception exception)
353 {
354 SimLogger.always().error(exception);
355 if (OTSDEVSRTParallelMove.this.isPauseOnError())
356 {
357 try
358 {
359 OTSDEVSRTParallelMove.this.stop();
360 }
361 catch (SimRuntimeException exception1)
362 {
363 SimLogger.always().error(exception1);
364 }
365 }
366 }
367 }
368 });
369 }
370 this.executor.shutdown();
371 try
372 {
373 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
374 }
375 catch (InterruptedException exception)
376 {
377 //
378 }
379
380 this.executor = Executors.newFixedThreadPool(1);
381 for (int i = 0; i < moveEvents.size(); i++)
382 {
383 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
384 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
385 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
386 this.executor.execute(new Runnable()
387 {
388 @Override
389 public void run()
390 {
391 try
392 {
393 moveEvent.execute();
394 }
395 catch (Exception exception)
396 {
397 SimLogger.always().error(exception);
398 if (OTSDEVSRTParallelMove.this.isPauseOnError())
399 {
400 try
401 {
402 OTSDEVSRTParallelMove.this.stop();
403 }
404 catch (SimRuntimeException exception1)
405 {
406 SimLogger.always().error(exception1);
407 }
408 }
409 }
410 }
411 });
412 }
413 this.executor.shutdown();
414 try
415 {
416 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
417 }
418 catch (InterruptedException exception)
419 {
420 //
421 }
422
423 }
424 }
425 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
426
427 updateAnimation();
428 animationThread.stopAnimation();
429 }
430
431 }