1 package org.opentrafficsim.core.dsol;
2
3 import java.io.Serializable;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8
9 import org.djunits.unit.DurationUnit;
10 import org.djunits.value.vdouble.scalar.Duration;
11 import org.djunits.value.vdouble.scalar.Time;
12 import org.opentrafficsim.core.gtu.GTU;
13
14 import nl.tudelft.simulation.dsol.SimRuntimeException;
15 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
16 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
17 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
18 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
19 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
20
21 /**
22 * <p>
23 * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
24 * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
25 * <p>
26 * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
27 * initial version Aug 15, 2014 <br>
28 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
29 */
30 public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
31 {
32 /** */
33 private static final long serialVersionUID = 20140909L;
34
35 /** number of threads to use for move(). */
36 private int moveThreads = 1;
37
38 /** the thread pool for parallel execution. */
39 private ExecutorService executor = null;
40
41 /**
42 * Create a new OTSRealTimeClock.
43 * @param moveThreads int; The number of move threads to use
44 * @param simulatorId the id of the simulator to use in remote communication
45 */
46 public OTSDEVSRTParallelMove(final int moveThreads, final Serializable simulatorId)
47 {
48 super(simulatorId);
49 setMoveThreads(moveThreads);
50 setEventList(new SynchronizedRedBlackTree<>());
51 }
52
53 /**
54 * Create a new OTSRealTimeClock.
55 * @param simulatorId the id of the simulator to use in remote communication
56 */
57 public OTSDEVSRTParallelMove(final Serializable simulatorId)
58 {
59 this(1, simulatorId);
60 }
61
62 /**
63 * @param moveThreads int; set moveThreads
64 */
65 public final void setMoveThreads(final int moveThreads)
66 {
67 this.moveThreads = moveThreads;
68 }
69
70 /**
71 * @return moveThreads
72 */
73 public final int getMoveThreads()
74 {
75 return this.moveThreads;
76 }
77
78 /** {@inheritDoc} */
79 @Override
80 protected final Duration simulatorTimeForWallClockMillis(final double factor)
81 {
82 return new Duration(factor, DurationUnit.MILLISECOND);
83 }
84
85 /** {@inheritDoc} */
86 @Override
87 public final String toString()
88 {
89 return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
90 }
91
92 // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
93 /** {@inheritDoc} */
94 @Override
95 @SuppressWarnings("checkstyle:designforextension")
96 public void run()
97 {
98 AnimationThread animationThread = new AnimationThread(this);
99 animationThread.start();
100
101 long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
102 SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
103 double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
104 double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
105 Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
106 // wall clock
107
108 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
109 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
110 {
111 // check if speedFactor has changed. If yes: re-baseline.
112 if (factor != getSpeedFactor())
113 {
114 clockTime0 = System.currentTimeMillis();
115 simTime0 = this.simulatorTime;
116 factor = getSpeedFactor();
117 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
118 }
119
120 // check if we are behind; syncTime is the needed current time on the wall-clock
121 double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
122 // delta is the time we might be behind
123 double simTime = this.simulatorTime.diff(simTime0).doubleValue();
124
125 if (syncTime > simTime)
126 {
127 // we are behind
128 if (!isCatchup())
129 {
130 // if no catch-up: re-baseline.
131 clockTime0 = System.currentTimeMillis();
132 simTime0 = this.simulatorTime;
133 }
134 else
135 {
136 // jump to the required wall-clock related time or to the time of the next event, whichever comes
137 // first
138 synchronized (super.semaphore)
139 {
140 Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
141 SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
142 SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
143 if (absSyncTime.lt(eventTime))
144 {
145 this.simulatorTime = absSyncTime;
146 }
147 else
148 {
149 this.simulatorTime = eventTime;
150 }
151 }
152 }
153 }
154
155 // peek at the first event and determine the time difference relative to RT speed; that determines
156 // how long we have to wait.
157 SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
158 double simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
159
160 /*
161 * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
162 * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
163 * have to wait 10 times that amount. We might also be behind.
164 */
165 if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
166 {
167 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
168 {
169 try
170 {
171 Thread.sleep(getUpdateMsec());
172
173 // check if speedFactor has changed. If yes: break out of this loop and execute event.
174 // this could cause a jump.
175 if (factor != getSpeedFactor())
176 {
177 simTimeDiffMillis = 0.0;
178 }
179
180 }
181 catch (InterruptedException ie)
182 {
183 // do nothing
184 ie = null;
185 }
186
187 // check if an event has been inserted. In a real-time situation this can be dome by other threads
188 if (!event.equals(this.eventList.first())) // event inserted by a thread...
189 {
190 event = this.eventList.first();
191 simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
192 }
193 else
194 {
195 // make a small time step for the animation during wallclock waiting.
196 // but never beyond the next event time.
197 if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
198 {
199 synchronized (super.semaphore)
200 {
201 this.simulatorTime.add(rSim);
202 }
203 }
204 }
205 }
206 }
207
208 this.simulatorTime = event.getAbsoluteExecutionTime();
209 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime.get());
210
211 if (this.moveThreads <= 1)
212 {
213 synchronized (super.semaphore)
214 {
215 // carry out all events scheduled on this simulation time, as long as we are still running.
216 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
217 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
218 {
219 event = this.eventList.removeFirst();
220 try
221 {
222 event.execute();
223 }
224 catch (Exception exception)
225 {
226 getLogger().always().error(exception);
227 if (this.isPauseOnError())
228 {
229 try
230 {
231 this.stop();
232 }
233 catch (SimRuntimeException exception1)
234 {
235 getLogger().always().error(exception1);
236 }
237 }
238 }
239 if (!this.eventList.isEmpty())
240 {
241 // peek at next event for while loop.
242 event = this.eventList.first();
243 }
244 }
245 }
246 }
247
248 else
249
250 {
251 // parallel execution of the move method
252 // first carry out all the non-move events and make a list of move events to be carried out in parallel
253 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
254 synchronized (super.semaphore)
255 {
256 while (this.isStartingOrRunning() && !this.eventList.isEmpty()
257 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
258 {
259 event = this.eventList.removeFirst();
260 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
261 if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
262 {
263 moveEvents.add(event);
264 }
265 else
266 {
267 try
268 {
269 event.execute();
270 }
271 catch (Exception exception)
272 {
273 getLogger().always().error(exception);
274 if (this.isPauseOnError())
275 {
276 try
277 {
278 this.stop();
279 }
280 catch (SimRuntimeException exception1)
281 {
282 getLogger().always().error(exception1);
283 }
284 }
285 }
286 }
287 if (!this.eventList.isEmpty())
288 {
289 // peek at next event for while loop.
290 event = this.eventList.first();
291 }
292 }
293 }
294
295 // then carry out the move events, based on a constant state at that time.
296 // first make sure that new events will be stored in a temporary event list...
297 this.executor = Executors.newFixedThreadPool(1);
298 for (int i = 0; i < moveEvents.size(); i++)
299 {
300 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
301 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
302 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
303 this.executor.execute(new Runnable()
304 {
305 @Override
306 public void run()
307 {
308 try
309 {
310 moveEvent.execute();
311 }
312 catch (Exception exception)
313 {
314 getLogger().always().error(exception);
315 if (OTSDEVSRTParallelMove.this.isPauseOnError())
316 {
317 try
318 {
319 OTSDEVSRTParallelMove.this.stop();
320 }
321 catch (SimRuntimeException exception1)
322 {
323 getLogger().always().error(exception1);
324 }
325 }
326 }
327 }
328 });
329 }
330 this.executor.shutdown();
331 try
332 {
333 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
334 }
335 catch (InterruptedException exception)
336 {
337 //
338 }
339
340 this.executor = Executors.newFixedThreadPool(1);
341 for (int i = 0; i < moveEvents.size(); i++)
342 {
343 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
344 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
345 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
346 this.executor.execute(new Runnable()
347 {
348 @Override
349 public void run()
350 {
351 try
352 {
353 moveEvent.execute();
354 }
355 catch (Exception exception)
356 {
357 getLogger().always().error(exception);
358 if (OTSDEVSRTParallelMove.this.isPauseOnError())
359 {
360 try
361 {
362 OTSDEVSRTParallelMove.this.stop();
363 }
364 catch (SimRuntimeException exception1)
365 {
366 getLogger().always().error(exception1);
367 }
368 }
369 }
370 }
371 });
372 }
373 this.executor.shutdown();
374 try
375 {
376 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
377 }
378 catch (InterruptedException exception)
379 {
380 //
381 }
382
383 this.executor = Executors.newFixedThreadPool(1);
384 for (int i = 0; i < moveEvents.size(); i++)
385 {
386 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
387 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
388 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
389 this.executor.execute(new Runnable()
390 {
391 @Override
392 public void run()
393 {
394 try
395 {
396 moveEvent.execute();
397 }
398 catch (Exception exception)
399 {
400 getLogger().always().error(exception);
401 if (OTSDEVSRTParallelMove.this.isPauseOnError())
402 {
403 try
404 {
405 OTSDEVSRTParallelMove.this.stop();
406 }
407 catch (SimRuntimeException exception1)
408 {
409 getLogger().always().error(exception1);
410 }
411 }
412 }
413 }
414 });
415 }
416 this.executor.shutdown();
417 try
418 {
419 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
420 }
421 catch (InterruptedException exception)
422 {
423 //
424 }
425
426 }
427 }
428 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
429
430 updateAnimation();
431 animationThread.stopAnimation();
432 }
433
434 }