1 package org.opentrafficsim.core.dsol;
2
3 import java.io.Serializable;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8
9 import org.djunits.unit.DurationUnit;
10 import org.djunits.value.vdouble.scalar.Duration;
11 import org.opentrafficsim.core.gtu.Gtu;
12
13 import nl.tudelft.simulation.dsol.SimRuntimeException;
14 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16 import nl.tudelft.simulation.dsol.simulators.DevsRealTimeAnimator;
17 import nl.tudelft.simulation.dsol.simulators.ErrorStrategy;
18 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
19
20 /**
21 * <p>
22 * Copyright (c) 2013-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23 * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
24 * </p>
25 * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
26 */
27 public class OtsDevsRealTimeParallelMove extends DevsRealTimeAnimator<Duration>
28 {
29 /** */
30 private static final long serialVersionUID = 20140909L;
31
32 /** number of threads to use for move(). */
33 private int moveThreads = 1;
34
35 /** the thread pool for parallel execution. */
36 private ExecutorService executor = null;
37
38 /**
39 * Create a new OTSRealTimeClock.
40 * @param moveThreads The number of move threads to use
41 * @param simulatorId the id of the simulator to use in remote communication
42 */
43 public OtsDevsRealTimeParallelMove(final int moveThreads, final Serializable simulatorId)
44 {
45 super(simulatorId);
46 setMoveThreads(moveThreads);
47 setEventList(new SynchronizedRedBlackTree<>());
48 }
49
50 /**
51 * Create a new OTSRealTimeClock.
52 * @param simulatorId the id of the simulator to use in remote communication
53 */
54 public OtsDevsRealTimeParallelMove(final Serializable simulatorId)
55 {
56 this(1, simulatorId);
57 }
58
59 /**
60 * @param moveThreads set moveThreads
61 */
62 public final void setMoveThreads(final int moveThreads)
63 {
64 this.moveThreads = moveThreads;
65 }
66
67 /**
68 * @return moveThreads
69 */
70 public final int getMoveThreads()
71 {
72 return this.moveThreads;
73 }
74
75 @Override
76 protected final Duration simulatorTimeForWallClockMillis(final double factor)
77 {
78 return new Duration(factor, DurationUnit.MILLISECOND);
79 }
80
81 @Override
82 public final String toString()
83 {
84 return "DevsRealTimeAnimator.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
85 }
86
87 // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
88 @Override
89 @SuppressWarnings("checkstyle:designforextension")
90 public void run()
91 {
92 AnimationThread animationThread = new AnimationThread(this);
93 animationThread.start();
94
95 long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
96 Duration simTime0 = this.simulatorTime; // _______ current zero for the sim clock
97 double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
98 double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
99 Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
100 // wall clock
101
102 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
103 && this.getSimulatorTime().le(this.replication.getEndTime()))
104 {
105 // check if speedFactor has changed. If yes: re-baseline.
106 if (factor != getSpeedFactor())
107 {
108 clockTime0 = System.currentTimeMillis();
109 simTime0 = this.simulatorTime;
110 factor = getSpeedFactor();
111 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
112 }
113
114 // check if we are behind; syncTime is the needed current time on the wall-clock
115 double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
116 // delta is the time we might be behind
117 double simTime = this.simulatorTime.minus(simTime0).doubleValue();
118
119 if (syncTime > simTime)
120 {
121 // we are behind
122 if (!isCatchup())
123 {
124 // if no catch-up: re-baseline.
125 clockTime0 = System.currentTimeMillis();
126 simTime0 = this.simulatorTime;
127 }
128 else
129 {
130 // jump to the required wall-clock related time or to the time of the next event, whichever comes
131 // first
132 synchronized (super.semaphore)
133 {
134 Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
135 Duration absSyncTime = this.simulatorTime.plus(delta);
136 Duration eventTime = this.eventList.first().getAbsoluteExecutionTime();
137 if (absSyncTime.lt(eventTime))
138 {
139 this.simulatorTime = absSyncTime;
140 }
141 else
142 {
143 this.simulatorTime = eventTime;
144 }
145 }
146 }
147 }
148
149 // peek at the first event and determine the time difference relative to RT speed; that determines
150 // how long we have to wait.
151 SimEventInterface<Duration> event = this.eventList.first();
152 double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
153
154 /*
155 * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
156 * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
157 * have to wait 10 times that amount. We might also be behind.
158 */
159 if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
160 {
161 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
162 {
163 try
164 {
165 Thread.sleep(getUpdateMsec());
166
167 // check if speedFactor has changed. If yes: break out of this loop and execute event.
168 // this could cause a jump.
169 if (factor != getSpeedFactor())
170 {
171 simTimeDiffMillis = 0.0;
172 }
173
174 }
175 catch (InterruptedException ie)
176 {
177 // do nothing
178 ie = null;
179 }
180
181 // check if an event has been inserted. In a real-time situation this can be dome by other threads
182 if (!event.equals(this.eventList.first())) // event inserted by a thread...
183 {
184 event = this.eventList.first();
185 simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
186 }
187 else
188 {
189 // make a small time step for the animation during wallclock waiting.
190 // but never beyond the next event time.
191 if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
192 {
193 synchronized (super.semaphore)
194 {
195 this.simulatorTime = this.simulatorTime.plus(rSim);
196 }
197 }
198 }
199 }
200 }
201
202 this.simulatorTime = event.getAbsoluteExecutionTime();
203 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime);
204
205 if (this.moveThreads <= 1)
206 {
207 synchronized (super.semaphore)
208 {
209 // carry out all events scheduled on this simulation time, as long as we are still running.
210 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
211 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
212 {
213 event = this.eventList.removeFirst();
214 try
215 {
216 event.execute();
217 }
218 catch (Exception exception)
219 {
220 getLogger().always().error(exception);
221 if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
222 {
223 try
224 {
225 this.stop();
226 }
227 catch (SimRuntimeException exception1)
228 {
229 getLogger().always().error(exception1);
230 }
231 }
232 }
233 if (!this.eventList.isEmpty())
234 {
235 // peek at next event for while loop.
236 event = this.eventList.first();
237 }
238 }
239 }
240 }
241
242 else
243
244 {
245 // parallel execution of the move method
246 // first carry out all the non-move events and make a list of move events to be carried out in parallel
247 List<SimEventInterface<Duration>> moveEvents = new ArrayList<>();
248 synchronized (super.semaphore)
249 {
250 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
251 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
252 {
253 event = this.eventList.removeFirst();
254 SimEvent<Duration> se = (SimEvent<Duration>) event;
255 if (se.getTarget() instanceof Gtu && se.getMethod().equals("move"))
256 {
257 moveEvents.add(event);
258 }
259 else
260 {
261 try
262 {
263 event.execute();
264 }
265 catch (Exception exception)
266 {
267 getLogger().always().error(exception);
268 if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
269 {
270 try
271 {
272 this.stop();
273 }
274 catch (SimRuntimeException exception1)
275 {
276 getLogger().always().error(exception1);
277 }
278 }
279 }
280 }
281 if (!this.eventList.isEmpty())
282 {
283 // peek at next event for while loop.
284 event = this.eventList.first();
285 }
286 }
287 }
288
289 // then carry out the move events, based on a constant state at that time.
290 // first make sure that new events will be stored in a temporary event list...
291 this.executor = Executors.newFixedThreadPool(1);
292 for (int i = 0; i < moveEvents.size(); i++)
293 {
294 SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
295 final SimEventInterface<Duration> moveEvent =
296 new SimEvent<>(this.simulatorTime, se.getTarget(), "movePrep", se.getArgs());
297 this.executor.execute(new Runnable()
298 {
299 @Override
300 public void run()
301 {
302 try
303 {
304 moveEvent.execute();
305 }
306 catch (Exception exception)
307 {
308 getLogger().always().error(exception);
309 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
310 {
311 try
312 {
313 OtsDevsRealTimeParallelMove.this.stop();
314 }
315 catch (SimRuntimeException exception1)
316 {
317 getLogger().always().error(exception1);
318 }
319 }
320 }
321 }
322 });
323 }
324 this.executor.shutdown();
325 try
326 {
327 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
328 }
329 catch (InterruptedException exception)
330 {
331 //
332 }
333
334 this.executor = Executors.newFixedThreadPool(1);
335 for (int i = 0; i < moveEvents.size(); i++)
336 {
337 SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
338 final SimEventInterface<Duration> moveEvent =
339 new SimEvent<>(this.simulatorTime, se.getTarget(), "moveGenerate", se.getArgs());
340 this.executor.execute(new Runnable()
341 {
342 @Override
343 public void run()
344 {
345 try
346 {
347 moveEvent.execute();
348 }
349 catch (Exception exception)
350 {
351 getLogger().always().error(exception);
352 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
353 {
354 try
355 {
356 OtsDevsRealTimeParallelMove.this.stop();
357 }
358 catch (SimRuntimeException exception1)
359 {
360 getLogger().always().error(exception1);
361 }
362 }
363 }
364 }
365 });
366 }
367 this.executor.shutdown();
368 try
369 {
370 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
371 }
372 catch (InterruptedException exception)
373 {
374 //
375 }
376
377 this.executor = Executors.newFixedThreadPool(1);
378 for (int i = 0; i < moveEvents.size(); i++)
379 {
380 SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
381 final SimEventInterface<Duration> moveEvent =
382 new SimEvent<>(this.simulatorTime, se.getTarget(), "moveFinish", se.getArgs());
383 this.executor.execute(new Runnable()
384 {
385 @Override
386 public void run()
387 {
388 try
389 {
390 moveEvent.execute();
391 }
392 catch (Exception exception)
393 {
394 getLogger().always().error(exception);
395 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
396 {
397 try
398 {
399 OtsDevsRealTimeParallelMove.this.stop();
400 }
401 catch (SimRuntimeException exception1)
402 {
403 getLogger().always().error(exception1);
404 }
405 }
406 }
407 }
408 });
409 }
410 this.executor.shutdown();
411 try
412 {
413 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
414 }
415 catch (InterruptedException exception)
416 {
417 //
418 }
419
420 }
421 }
422 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, /* this.simulatorTime, */ this.simulatorTime);
423
424 updateAnimation();
425 animationThread.stopAnimation();
426 }
427
428 }