1 package org.opentrafficsim.core.dsol;
2
3 import java.io.Serializable;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8
9 import org.djunits.unit.DurationUnit;
10 import org.djunits.value.vdouble.scalar.Duration;
11 import org.opentrafficsim.core.gtu.GTU;
12
13 import nl.tudelft.simulation.dsol.SimRuntimeException;
14 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeAnimator;
17 import nl.tudelft.simulation.dsol.simulators.ErrorStrategy;
18 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
19
20 /**
21 * <p>
22 * Copyright (c) 2013-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23 * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
24 * <p>
25 * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
26 * initial version Aug 15, 2014 <br>
27 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
28 */
29 public class OTSDEVSRTParallelMove extends DEVSRealTimeAnimator<Duration>
30 {
31 /** */
32 private static final long serialVersionUID = 20140909L;
33
34 /** number of threads to use for move(). */
35 private int moveThreads = 1;
36
37 /** the thread pool for parallel execution. */
38 private ExecutorService executor = null;
39
40 /**
41 * Create a new OTSRealTimeClock.
42 * @param moveThreads int; The number of move threads to use
43 * @param simulatorId the id of the simulator to use in remote communication
44 */
45 public OTSDEVSRTParallelMove(final int moveThreads, final Serializable simulatorId)
46 {
47 super(simulatorId);
48 setMoveThreads(moveThreads);
49 setEventList(new SynchronizedRedBlackTree<>());
50 }
51
52 /**
53 * Create a new OTSRealTimeClock.
54 * @param simulatorId the id of the simulator to use in remote communication
55 */
56 public OTSDEVSRTParallelMove(final Serializable simulatorId)
57 {
58 this(1, simulatorId);
59 }
60
61 /**
62 * @param moveThreads int; set moveThreads
63 */
64 public final void setMoveThreads(final int moveThreads)
65 {
66 this.moveThreads = moveThreads;
67 }
68
69 /**
70 * @return moveThreads
71 */
72 public final int getMoveThreads()
73 {
74 return this.moveThreads;
75 }
76
77 /** {@inheritDoc} */
78 @Override
79 protected final Duration simulatorTimeForWallClockMillis(final double factor)
80 {
81 return new Duration(factor, DurationUnit.MILLISECOND);
82 }
83
84 /** {@inheritDoc} */
85 @Override
86 public final String toString()
87 {
88 return "DEVSRealTimeAnimator.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
89 }
90
91 // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
92 /** {@inheritDoc} */
93 @Override
94 @SuppressWarnings("checkstyle:designforextension")
95 public void run()
96 {
97 AnimationThread animationThread = new AnimationThread(this);
98 animationThread.start();
99
100 long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
101 Duration simTime0 = this.simulatorTime; // _______ current zero for the sim clock
102 double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
103 double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
104 Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
105 // wall clock
106
107 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
108 && this.getSimulatorTime().le(this.replication.getEndTime()))
109 {
110 // check if speedFactor has changed. If yes: re-baseline.
111 if (factor != getSpeedFactor())
112 {
113 clockTime0 = System.currentTimeMillis();
114 simTime0 = this.simulatorTime;
115 factor = getSpeedFactor();
116 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
117 }
118
119 // check if we are behind; syncTime is the needed current time on the wall-clock
120 double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
121 // delta is the time we might be behind
122 double simTime = this.simulatorTime.minus(simTime0).doubleValue();
123
124 if (syncTime > simTime)
125 {
126 // we are behind
127 if (!isCatchup())
128 {
129 // if no catch-up: re-baseline.
130 clockTime0 = System.currentTimeMillis();
131 simTime0 = this.simulatorTime;
132 }
133 else
134 {
135 // jump to the required wall-clock related time or to the time of the next event, whichever comes
136 // first
137 synchronized (super.semaphore)
138 {
139 Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
140 Duration absSyncTime = this.simulatorTime.plus(delta);
141 Duration eventTime = this.eventList.first().getAbsoluteExecutionTime();
142 if (absSyncTime.lt(eventTime))
143 {
144 this.simulatorTime = absSyncTime;
145 }
146 else
147 {
148 this.simulatorTime = eventTime;
149 }
150 }
151 }
152 }
153
154 // peek at the first event and determine the time difference relative to RT speed; that determines
155 // how long we have to wait.
156 SimEventInterface<Duration> event = this.eventList.first();
157 double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
158
159 /*
160 * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
161 * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
162 * have to wait 10 times that amount. We might also be behind.
163 */
164 if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
165 {
166 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
167 {
168 try
169 {
170 Thread.sleep(getUpdateMsec());
171
172 // check if speedFactor has changed. If yes: break out of this loop and execute event.
173 // this could cause a jump.
174 if (factor != getSpeedFactor())
175 {
176 simTimeDiffMillis = 0.0;
177 }
178
179 }
180 catch (InterruptedException ie)
181 {
182 // do nothing
183 ie = null;
184 }
185
186 // check if an event has been inserted. In a real-time situation this can be dome by other threads
187 if (!event.equals(this.eventList.first())) // event inserted by a thread...
188 {
189 event = this.eventList.first();
190 simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
191 }
192 else
193 {
194 // make a small time step for the animation during wallclock waiting.
195 // but never beyond the next event time.
196 if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
197 {
198 synchronized (super.semaphore)
199 {
200 this.simulatorTime = this.simulatorTime.plus(rSim);
201 }
202 }
203 }
204 }
205 }
206
207 this.simulatorTime = event.getAbsoluteExecutionTime();
208 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime);
209
210 if (this.moveThreads <= 1)
211 {
212 synchronized (super.semaphore)
213 {
214 // carry out all events scheduled on this simulation time, as long as we are still running.
215 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
216 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
217 {
218 event = this.eventList.removeFirst();
219 try
220 {
221 event.execute();
222 }
223 catch (Exception exception)
224 {
225 getLogger().always().error(exception);
226 if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
227 {
228 try
229 {
230 this.stop();
231 }
232 catch (SimRuntimeException exception1)
233 {
234 getLogger().always().error(exception1);
235 }
236 }
237 }
238 if (!this.eventList.isEmpty())
239 {
240 // peek at next event for while loop.
241 event = this.eventList.first();
242 }
243 }
244 }
245 }
246
247 else
248
249 {
250 // parallel execution of the move method
251 // first carry out all the non-move events and make a list of move events to be carried out in parallel
252 List<SimEventInterface<Duration>> moveEvents = new ArrayList<>();
253 synchronized (super.semaphore)
254 {
255 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
256 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
257 {
258 event = this.eventList.removeFirst();
259 SimEvent<Duration> se = (SimEvent<Duration>) event;
260 if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
261 {
262 moveEvents.add(event);
263 }
264 else
265 {
266 try
267 {
268 event.execute();
269 }
270 catch (Exception exception)
271 {
272 getLogger().always().error(exception);
273 if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
274 {
275 try
276 {
277 this.stop();
278 }
279 catch (SimRuntimeException exception1)
280 {
281 getLogger().always().error(exception1);
282 }
283 }
284 }
285 }
286 if (!this.eventList.isEmpty())
287 {
288 // peek at next event for while loop.
289 event = this.eventList.first();
290 }
291 }
292 }
293
294 // then carry out the move events, based on a constant state at that time.
295 // first make sure that new events will be stored in a temporary event list...
296 this.executor = Executors.newFixedThreadPool(1);
297 for (int i = 0; i < moveEvents.size(); i++)
298 {
299 SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
300 final SimEventInterface<Duration> moveEvent =
301 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
302 this.executor.execute(new Runnable()
303 {
304 @Override
305 public void run()
306 {
307 try
308 {
309 moveEvent.execute();
310 }
311 catch (Exception exception)
312 {
313 getLogger().always().error(exception);
314 if (OTSDEVSRTParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
315 {
316 try
317 {
318 OTSDEVSRTParallelMove.this.stop();
319 }
320 catch (SimRuntimeException exception1)
321 {
322 getLogger().always().error(exception1);
323 }
324 }
325 }
326 }
327 });
328 }
329 this.executor.shutdown();
330 try
331 {
332 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
333 }
334 catch (InterruptedException exception)
335 {
336 //
337 }
338
339 this.executor = Executors.newFixedThreadPool(1);
340 for (int i = 0; i < moveEvents.size(); i++)
341 {
342 SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
343 final SimEventInterface<Duration> moveEvent =
344 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
345 this.executor.execute(new Runnable()
346 {
347 @Override
348 public void run()
349 {
350 try
351 {
352 moveEvent.execute();
353 }
354 catch (Exception exception)
355 {
356 getLogger().always().error(exception);
357 if (OTSDEVSRTParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
358 {
359 try
360 {
361 OTSDEVSRTParallelMove.this.stop();
362 }
363 catch (SimRuntimeException exception1)
364 {
365 getLogger().always().error(exception1);
366 }
367 }
368 }
369 }
370 });
371 }
372 this.executor.shutdown();
373 try
374 {
375 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
376 }
377 catch (InterruptedException exception)
378 {
379 //
380 }
381
382 this.executor = Executors.newFixedThreadPool(1);
383 for (int i = 0; i < moveEvents.size(); i++)
384 {
385 SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
386 final SimEventInterface<Duration> moveEvent =
387 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
388 this.executor.execute(new Runnable()
389 {
390 @Override
391 public void run()
392 {
393 try
394 {
395 moveEvent.execute();
396 }
397 catch (Exception exception)
398 {
399 getLogger().always().error(exception);
400 if (OTSDEVSRTParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
401 {
402 try
403 {
404 OTSDEVSRTParallelMove.this.stop();
405 }
406 catch (SimRuntimeException exception1)
407 {
408 getLogger().always().error(exception1);
409 }
410 }
411 }
412 }
413 });
414 }
415 this.executor.shutdown();
416 try
417 {
418 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
419 }
420 catch (InterruptedException exception)
421 {
422 //
423 }
424
425 }
426 }
427 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, /* this.simulatorTime, */ this.simulatorTime);
428
429 updateAnimation();
430 animationThread.stopAnimation();
431 }
432
433 }