1 package org.opentrafficsim.core.dsol;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7
8 import org.djunits.unit.DurationUnit;
9 import org.djunits.value.vdouble.scalar.Duration;
10 import org.djunits.value.vdouble.scalar.Time;
11 import org.opentrafficsim.core.gtu.GTU;
12 import org.opentrafficsim.core.logger.SimLogger;
13
14 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
17 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
18 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
19
20 /**
21 * <p>
22 * Copyright (c) 2013-2018 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23 * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
24 * <p>
25 * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
26 * initial version Aug 15, 2014 <br>
27 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
28 */
29 public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
30 {
31 /** */
32 private static final long serialVersionUID = 20140909L;
33
34 /** number of threads to use for move(). */
35 private int moveThreads = 1;
36
37 /** the thread pool for parallel execution. */
38 private ExecutorService executor = null;
39
40 /**
41 * Create a new OTSRealTimeClock.
42 * @param moveThreads The number of move threads to use
43 */
44 public OTSDEVSRTParallelMove(final int moveThreads)
45 {
46 super();
47 setMoveThreads(moveThreads);
48 setEventList(new SynchronizedRedBlackTree<>());
49 }
50
51 /**
52 * Create a new OTSRealTimeClock.
53 */
54 public OTSDEVSRTParallelMove()
55 {
56 this(1);
57 }
58
59 /**
60 * @param moveThreads set moveThreads
61 */
62 public final void setMoveThreads(final int moveThreads)
63 {
64 this.moveThreads = moveThreads;
65 }
66
67 /**
68 * @return moveThreads
69 */
70 public final int getMoveThreads()
71 {
72 return this.moveThreads;
73 }
74
75 /** {@inheritDoc} */
76 @Override
77 protected final Duration relativeMillis(final double factor)
78 {
79 return new Duration(factor, DurationUnit.MILLISECOND);
80 }
81
82 /** {@inheritDoc} */
83 @Override
84 public final String toString()
85 {
86 return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
87 }
88
89 /** {@inheritDoc} */
90 @Override
91 @SuppressWarnings("checkstyle:designforextension")
92 public void run()
93 {
94 AnimationThread animationThread = new AnimationThread(this);
95 animationThread.start();
96
97 long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
98 SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
99 double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
100 double msec1 = relativeMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
101 Duration rSim = this.relativeMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
102
103 while (this.isRunning() && !this.eventList.isEmpty()
104 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
105 {
106 // check if speedFactor has changed. If yes: re-baseline.
107 if (factor != getSpeedFactor())
108 {
109 clockTime0 = System.currentTimeMillis();
110 simTime0 = this.simulatorTime;
111 factor = getSpeedFactor();
112 rSim = this.relativeMillis(getUpdateMsec() * factor);
113 }
114
115 // check if we are behind; syncTime is the needed current time on the wall-clock
116 double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
117 // delta is the time we might be behind
118 double simTime = this.simulatorTime.minus(simTime0).doubleValue();
119
120 if (syncTime > simTime)
121 {
122 // we are behind
123 if (!isCatchup())
124 {
125 // if no catch-up: re-baseline.
126 clockTime0 = System.currentTimeMillis();
127 simTime0 = this.simulatorTime;
128 }
129 else
130 {
131 // jump to the required wall-clock related time or to the time of the next event, whichever comes
132 // first
133 synchronized (super.semaphore)
134 {
135 Duration delta = relativeMillis((syncTime - simTime) / msec1);
136 SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
137 SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
138 if (absSyncTime.lt(eventTime))
139 {
140 this.simulatorTime = absSyncTime;
141 }
142 else
143 {
144 this.simulatorTime = eventTime;
145 }
146 }
147 }
148 }
149
150 // peek at the first event and determine the time difference relative to RT speed; that determines
151 // how long we have to wait.
152 SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
153 double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
154
155 /*
156 * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
157 * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
158 * have to wait 10 times that amount. We might also be behind.
159 */
160 if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
161 {
162 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
163 {
164 try
165 {
166 Thread.sleep(getUpdateMsec());
167
168 // check if speedFactor has changed. If yes: break out of this loop and execute event.
169 // this could cause a jump.
170 if (factor != getSpeedFactor())
171 {
172 simTimeDiffMillis = 0.0;
173 }
174
175 }
176 catch (InterruptedException ie)
177 {
178 // do nothing
179 ie = null;
180 }
181
182 // check if an event has been inserted. In a real-time situation this can be dome by other threads
183 if (!event.equals(this.eventList.first())) // event inserted by a thread...
184 {
185 event = this.eventList.first();
186 simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
187 }
188 else
189 {
190 // make a small time step for the animation during wallclock waiting.
191 // but never beyond the next event time.
192 if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
193 {
194 synchronized (super.semaphore)
195 {
196 this.simulatorTime.add(rSim);
197 }
198 }
199 }
200 }
201 }
202
203 this.simulatorTime = event.getAbsoluteExecutionTime();
204 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
205
206 if (this.moveThreads <= 1)
207 {
208 synchronized (super.semaphore)
209 {
210 // carry out all events scheduled on this simulation time, as long as we are still running.
211 while (this.isRunning() && !this.eventList.isEmpty()
212 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
213 {
214 event = this.eventList.removeFirst();
215 try
216 {
217 event.execute();
218 }
219 catch (Exception exception)
220 {
221 SimLogger.always().error(exception);
222 if (this.isPauseOnError())
223 {
224 this.stop();
225 }
226 }
227 if (!this.eventList.isEmpty())
228 {
229 // peek at next event for while loop.
230 event = this.eventList.first();
231 }
232 }
233 }
234 }
235
236 else
237
238 {
239 // parallel execution of the move method
240 // first carry out all the non-move events and make a list of move events to be carried out in parallel
241 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
242 synchronized (super.semaphore)
243 {
244 while (this.isRunning() && !this.eventList.isEmpty()
245 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
246 {
247 event = this.eventList.removeFirst();
248 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
249 if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
250 {
251 moveEvents.add(event);
252 }
253 else
254 {
255 try
256 {
257 event.execute();
258 }
259 catch (Exception exception)
260 {
261 SimLogger.always().error(exception);
262 if (this.isPauseOnError())
263 {
264 this.stop();
265 }
266 }
267 }
268 if (!this.eventList.isEmpty())
269 {
270 // peek at next event for while loop.
271 event = this.eventList.first();
272 }
273 }
274 }
275
276 // then carry out the move events, based on a constant state at that time.
277 // first make sure that new events will be stored in a temporary event list...
278 this.executor = Executors.newFixedThreadPool(1);
279 for (int i = 0; i < moveEvents.size(); i++)
280 {
281 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
282 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
283 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
284 this.executor.execute(new Runnable()
285 {
286 @Override
287 public void run()
288 {
289 try
290 {
291 moveEvent.execute();
292 }
293 catch (Exception exception)
294 {
295 SimLogger.always().error(exception);
296 if (OTSDEVSRTParallelMove.this.isPauseOnError())
297 {
298 OTSDEVSRTParallelMove.this.stop();
299 }
300 }
301 }
302 });
303 }
304 this.executor.shutdown();
305 try
306 {
307 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
308 }
309 catch (InterruptedException exception)
310 {
311 //
312 }
313
314 this.executor = Executors.newFixedThreadPool(1);
315 for (int i = 0; i < moveEvents.size(); i++)
316 {
317 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
318 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
319 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
320 this.executor.execute(new Runnable()
321 {
322 @Override
323 public void run()
324 {
325 try
326 {
327 moveEvent.execute();
328 }
329 catch (Exception exception)
330 {
331 SimLogger.always().error(exception);
332 if (OTSDEVSRTParallelMove.this.isPauseOnError())
333 {
334 OTSDEVSRTParallelMove.this.stop();
335 }
336 }
337 }
338 });
339 }
340 this.executor.shutdown();
341 try
342 {
343 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
344 }
345 catch (InterruptedException exception)
346 {
347 //
348 }
349
350 this.executor = Executors.newFixedThreadPool(1);
351 for (int i = 0; i < moveEvents.size(); i++)
352 {
353 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
354 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
355 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
356 this.executor.execute(new Runnable()
357 {
358 @Override
359 public void run()
360 {
361 try
362 {
363 moveEvent.execute();
364 }
365 catch (Exception exception)
366 {
367 SimLogger.always().error(exception);
368 if (OTSDEVSRTParallelMove.this.isPauseOnError())
369 {
370 OTSDEVSRTParallelMove.this.stop();
371 }
372 }
373 }
374 });
375 }
376 this.executor.shutdown();
377 try
378 {
379 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
380 }
381 catch (InterruptedException exception)
382 {
383 //
384 }
385
386 }
387 }
388 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
389
390 updateAnimation();
391 animationThread.stopAnimation();
392 }
393
394 }