1 package org.opentrafficsim.core.dsol;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7
8 import org.djunits.unit.DurationUnit;
9 import org.djunits.value.vdouble.scalar.Duration;
10 import org.djunits.value.vdouble.scalar.Time;
11 import org.opentrafficsim.core.gtu.GTU;
12
13 import nl.tudelft.simulation.dsol.SimRuntimeException;
14 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16 import nl.tudelft.simulation.dsol.logger.SimLogger;
17 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
18 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
19 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
20
21 /**
22 * <p>
23 * Copyright (c) 2013-2019 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
24 * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
25 * <p>
26 * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
27 * initial version Aug 15, 2014 <br>
28 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
29 */
30 public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
31 {
32 /** */
33 private static final long serialVersionUID = 20140909L;
34
35 /** number of threads to use for move(). */
36 private int moveThreads = 1;
37
38 /** the thread pool for parallel execution. */
39 private ExecutorService executor = null;
40
41 /**
42 * Create a new OTSRealTimeClock.
43 * @param moveThreads int; The number of move threads to use
44 */
45 public OTSDEVSRTParallelMove(final int moveThreads)
46 {
47 super();
48 setMoveThreads(moveThreads);
49 setEventList(new SynchronizedRedBlackTree<>());
50 }
51
52 /**
53 * Create a new OTSRealTimeClock.
54 */
55 public OTSDEVSRTParallelMove()
56 {
57 this(1);
58 }
59
60 /**
61 * @param moveThreads int; set moveThreads
62 */
63 public final void setMoveThreads(final int moveThreads)
64 {
65 this.moveThreads = moveThreads;
66 }
67
68 /**
69 * @return moveThreads
70 */
71 public final int getMoveThreads()
72 {
73 return this.moveThreads;
74 }
75
76 /** {@inheritDoc} */
77 @Override
78 protected final Duration relativeMillis(final double factor)
79 {
80 return new Duration(factor, DurationUnit.MILLISECOND);
81 }
82
83 /** {@inheritDoc} */
84 @Override
85 public final String toString()
86 {
87 return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
88 }
89
90 /** {@inheritDoc} */
91 @Override
92 @SuppressWarnings("checkstyle:designforextension")
93 public void run()
94 {
95 AnimationThread animationThread = new AnimationThread(this);
96 animationThread.start();
97
98 long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
99 SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
100 double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
101 double msec1 = relativeMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
102 Duration rSim = this.relativeMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
103
104 while (this.isRunning() && !this.eventList.isEmpty()
105 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
106 {
107 // check if speedFactor has changed. If yes: re-baseline.
108 if (factor != getSpeedFactor())
109 {
110 clockTime0 = System.currentTimeMillis();
111 simTime0 = this.simulatorTime;
112 factor = getSpeedFactor();
113 rSim = this.relativeMillis(getUpdateMsec() * factor);
114 }
115
116 // check if we are behind; syncTime is the needed current time on the wall-clock
117 double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
118 // delta is the time we might be behind
119 double simTime = this.simulatorTime.diff(simTime0).doubleValue();
120
121 if (syncTime > simTime)
122 {
123 // we are behind
124 if (!isCatchup())
125 {
126 // if no catch-up: re-baseline.
127 clockTime0 = System.currentTimeMillis();
128 simTime0 = this.simulatorTime;
129 }
130 else
131 {
132 // jump to the required wall-clock related time or to the time of the next event, whichever comes
133 // first
134 synchronized (super.semaphore)
135 {
136 Duration delta = relativeMillis((syncTime - simTime) / msec1);
137 SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
138 SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
139 if (absSyncTime.lt(eventTime))
140 {
141 this.simulatorTime = absSyncTime;
142 }
143 else
144 {
145 this.simulatorTime = eventTime;
146 }
147 }
148 }
149 }
150
151 // peek at the first event and determine the time difference relative to RT speed; that determines
152 // how long we have to wait.
153 SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
154 double simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
155
156 /*
157 * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
158 * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
159 * have to wait 10 times that amount. We might also be behind.
160 */
161 if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
162 {
163 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
164 {
165 try
166 {
167 Thread.sleep(getUpdateMsec());
168
169 // check if speedFactor has changed. If yes: break out of this loop and execute event.
170 // this could cause a jump.
171 if (factor != getSpeedFactor())
172 {
173 simTimeDiffMillis = 0.0;
174 }
175
176 }
177 catch (InterruptedException ie)
178 {
179 // do nothing
180 ie = null;
181 }
182
183 // check if an event has been inserted. In a real-time situation this can be dome by other threads
184 if (!event.equals(this.eventList.first())) // event inserted by a thread...
185 {
186 event = this.eventList.first();
187 simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
188 }
189 else
190 {
191 // make a small time step for the animation during wallclock waiting.
192 // but never beyond the next event time.
193 if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
194 {
195 synchronized (super.semaphore)
196 {
197 this.simulatorTime.add(rSim);
198 }
199 }
200 }
201 }
202 }
203
204 this.simulatorTime = event.getAbsoluteExecutionTime();
205 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
206
207 if (this.moveThreads <= 1)
208 {
209 synchronized (super.semaphore)
210 {
211 // carry out all events scheduled on this simulation time, as long as we are still running.
212 while (this.isRunning() && !this.eventList.isEmpty()
213 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
214 {
215 event = this.eventList.removeFirst();
216 try
217 {
218 event.execute();
219 }
220 catch (Exception exception)
221 {
222 SimLogger.always().error(exception);
223 if (this.isPauseOnError())
224 {
225 try
226 {
227 this.stop();
228 }
229 catch (SimRuntimeException exception1)
230 {
231 SimLogger.always().error(exception1);
232 }
233 }
234 }
235 if (!this.eventList.isEmpty())
236 {
237 // peek at next event for while loop.
238 event = this.eventList.first();
239 }
240 }
241 }
242 }
243
244 else
245
246 {
247 // parallel execution of the move method
248 // first carry out all the non-move events and make a list of move events to be carried out in parallel
249 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
250 synchronized (super.semaphore)
251 {
252 while (this.isRunning() && !this.eventList.isEmpty()
253 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
254 {
255 event = this.eventList.removeFirst();
256 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
257 if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
258 {
259 moveEvents.add(event);
260 }
261 else
262 {
263 try
264 {
265 event.execute();
266 }
267 catch (Exception exception)
268 {
269 SimLogger.always().error(exception);
270 if (this.isPauseOnError())
271 {
272 try
273 {
274 this.stop();
275 }
276 catch (SimRuntimeException exception1)
277 {
278 SimLogger.always().error(exception1);
279 }
280 }
281 }
282 }
283 if (!this.eventList.isEmpty())
284 {
285 // peek at next event for while loop.
286 event = this.eventList.first();
287 }
288 }
289 }
290
291 // then carry out the move events, based on a constant state at that time.
292 // first make sure that new events will be stored in a temporary event list...
293 this.executor = Executors.newFixedThreadPool(1);
294 for (int i = 0; i < moveEvents.size(); i++)
295 {
296 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
297 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
298 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
299 this.executor.execute(new Runnable()
300 {
301 @Override
302 public void run()
303 {
304 try
305 {
306 moveEvent.execute();
307 }
308 catch (Exception exception)
309 {
310 SimLogger.always().error(exception);
311 if (OTSDEVSRTParallelMove.this.isPauseOnError())
312 {
313 try
314 {
315 OTSDEVSRTParallelMove.this.stop();
316 }
317 catch (SimRuntimeException exception1)
318 {
319 SimLogger.always().error(exception1);
320 }
321 }
322 }
323 }
324 });
325 }
326 this.executor.shutdown();
327 try
328 {
329 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
330 }
331 catch (InterruptedException exception)
332 {
333 //
334 }
335
336 this.executor = Executors.newFixedThreadPool(1);
337 for (int i = 0; i < moveEvents.size(); i++)
338 {
339 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
340 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
341 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
342 this.executor.execute(new Runnable()
343 {
344 @Override
345 public void run()
346 {
347 try
348 {
349 moveEvent.execute();
350 }
351 catch (Exception exception)
352 {
353 SimLogger.always().error(exception);
354 if (OTSDEVSRTParallelMove.this.isPauseOnError())
355 {
356 try
357 {
358 OTSDEVSRTParallelMove.this.stop();
359 }
360 catch (SimRuntimeException exception1)
361 {
362 SimLogger.always().error(exception1);
363 }
364 }
365 }
366 }
367 });
368 }
369 this.executor.shutdown();
370 try
371 {
372 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
373 }
374 catch (InterruptedException exception)
375 {
376 //
377 }
378
379 this.executor = Executors.newFixedThreadPool(1);
380 for (int i = 0; i < moveEvents.size(); i++)
381 {
382 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
383 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
384 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
385 this.executor.execute(new Runnable()
386 {
387 @Override
388 public void run()
389 {
390 try
391 {
392 moveEvent.execute();
393 }
394 catch (Exception exception)
395 {
396 SimLogger.always().error(exception);
397 if (OTSDEVSRTParallelMove.this.isPauseOnError())
398 {
399 try
400 {
401 OTSDEVSRTParallelMove.this.stop();
402 }
403 catch (SimRuntimeException exception1)
404 {
405 SimLogger.always().error(exception1);
406 }
407 }
408 }
409 }
410 });
411 }
412 this.executor.shutdown();
413 try
414 {
415 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
416 }
417 catch (InterruptedException exception)
418 {
419 //
420 }
421
422 }
423 }
424 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
425
426 updateAnimation();
427 animationThread.stopAnimation();
428 }
429
430 }