1 package org.opentrafficsim.core.dsol;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7
8 import org.djunits.unit.DurationUnit;
9 import org.djunits.value.vdouble.scalar.Duration;
10 import org.djunits.value.vdouble.scalar.Time;
11 import org.opentrafficsim.core.gtu.GTU;
12
13 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
14 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
15 import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
16 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
17 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
18
19 /**
20 * <p>
21 * Copyright (c) 2013-2018 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
22 * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
23 * <p>
24 * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
25 * initial version Aug 15, 2014 <br>
26 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
27 */
28 public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
29 {
30 /** */
31 private static final long serialVersionUID = 20140909L;
32
33 /** number of threads to use for move(). */
34 private int moveThreads = 1;
35
36 /** the thread pool for parallel execution. */
37 private ExecutorService executor = null;
38
39 /**
40 * Create a new OTSRealTimeClock.
41 * @param moveThreads The number of move threads to use
42 */
43 public OTSDEVSRTParallelMove(final int moveThreads)
44 {
45 super();
46 setMoveThreads(moveThreads);
47 setEventList(new SynchronizedRedBlackTree<>());
48 }
49
50 /**
51 * Create a new OTSRealTimeClock.
52 */
53 public OTSDEVSRTParallelMove()
54 {
55 this(1);
56 }
57
58 /**
59 * @param moveThreads set moveThreads
60 */
61 public final void setMoveThreads(final int moveThreads)
62 {
63 this.moveThreads = moveThreads;
64 }
65
66 /**
67 * @return moveThreads
68 */
69 public final int getMoveThreads()
70 {
71 return this.moveThreads;
72 }
73
74 /** {@inheritDoc} */
75 @Override
76 protected final Duration relativeMillis(final double factor)
77 {
78 return new Duration(factor, DurationUnit.MILLISECOND);
79 }
80
81 /** {@inheritDoc} */
82 @Override
83 public final String toString()
84 {
85 return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
86 }
87
88 /** {@inheritDoc} */
89 @Override
90 @SuppressWarnings("checkstyle:designforextension")
91 public void run()
92 {
93 AnimationThread animationThread = new AnimationThread(this);
94 animationThread.start();
95
96 long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
97 SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
98 double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
99 double msec1 = relativeMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
100 Duration rSim = this.relativeMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
101
102 while (this.isRunning() && !this.eventList.isEmpty()
103 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
104 {
105 // check if speedFactor has changed. If yes: re-baseline.
106 if (factor != getSpeedFactor())
107 {
108 clockTime0 = System.currentTimeMillis();
109 simTime0 = this.simulatorTime;
110 factor = getSpeedFactor();
111 rSim = this.relativeMillis(getUpdateMsec() * factor);
112 }
113
114 // check if we are behind; syncTime is the needed current time on the wall-clock
115 double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
116 // delta is the time we might be behind
117 double simTime = this.simulatorTime.minus(simTime0).doubleValue();
118
119 if (syncTime > simTime)
120 {
121 // we are behind
122 if (!isCatchup())
123 {
124 // if no catch-up: re-baseline.
125 clockTime0 = System.currentTimeMillis();
126 simTime0 = this.simulatorTime;
127 }
128 else
129 {
130 // jump to the required wall-clock related time or to the time of the next event, whichever comes
131 // first
132 synchronized (super.semaphore)
133 {
134 Duration delta = relativeMillis((syncTime - simTime) / msec1);
135 SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
136 SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
137 if (absSyncTime.lt(eventTime))
138 {
139 this.simulatorTime = absSyncTime;
140 }
141 else
142 {
143 this.simulatorTime = eventTime;
144 }
145 }
146 }
147 }
148
149 // peek at the first event and determine the time difference relative to RT speed; that determines
150 // how long we have to wait.
151 SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
152 double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
153
154 /*
155 * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
156 * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
157 * have to wait 10 times that amount. We might also be behind.
158 */
159 if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
160 {
161 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
162 {
163 try
164 {
165 Thread.sleep(getUpdateMsec());
166
167 // check if speedFactor has changed. If yes: break out of this loop and execute event.
168 // this could cause a jump.
169 if (factor != getSpeedFactor())
170 {
171 simTimeDiffMillis = 0.0;
172 }
173
174 }
175 catch (InterruptedException ie)
176 {
177 // do nothing
178 ie = null;
179 }
180
181 // check if an event has been inserted. In a real-time situation this can be dome by other threads
182 if (!event.equals(this.eventList.first())) // event inserted by a thread...
183 {
184 event = this.eventList.first();
185 simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
186 }
187 else
188 {
189 // make a small time step for the animation during wallclock waiting.
190 // but never beyond the next event time.
191 if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
192 {
193 synchronized (super.semaphore)
194 {
195 this.simulatorTime.add(rSim);
196 }
197 }
198 }
199 }
200 }
201
202 this.simulatorTime = event.getAbsoluteExecutionTime();
203 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
204
205 if (this.moveThreads <= 1)
206 {
207 synchronized (super.semaphore)
208 {
209 // carry out all events scheduled on this simulation time, as long as we are still running.
210 while (this.isRunning() && !this.eventList.isEmpty()
211 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
212 {
213 event = this.eventList.removeFirst();
214 try
215 {
216 event.execute();
217 }
218 catch (Exception exception)
219 {
220 exception.printStackTrace();
221 if (this.isPauseOnError())
222 {
223 this.stop();
224 }
225 }
226 if (!this.eventList.isEmpty())
227 {
228 // peek at next event for while loop.
229 event = this.eventList.first();
230 }
231 }
232 }
233 }
234
235 else
236
237 {
238 // parallel execution of the move method
239 // first carry out all the non-move events and make a list of move events to be carried out in parallel
240 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
241 synchronized (super.semaphore)
242 {
243 while (this.isRunning() && !this.eventList.isEmpty()
244 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
245 {
246 event = this.eventList.removeFirst();
247 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
248 if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
249 {
250 moveEvents.add(event);
251 }
252 else
253 {
254 try
255 {
256 event.execute();
257 }
258 catch (Exception exception)
259 {
260 exception.printStackTrace();
261 if (this.isPauseOnError())
262 {
263 this.stop();
264 }
265 }
266 }
267 if (!this.eventList.isEmpty())
268 {
269 // peek at next event for while loop.
270 event = this.eventList.first();
271 }
272 }
273 }
274
275 // then carry out the move events, based on a constant state at that time.
276 // first make sure that new events will be stored in a temporary event list...
277 this.executor = Executors.newFixedThreadPool(1);
278 for (int i = 0; i < moveEvents.size(); i++)
279 {
280 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
281 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
282 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
283 this.executor.execute(new Runnable()
284 {
285 @Override
286 public void run()
287 {
288 try
289 {
290 moveEvent.execute();
291 }
292 catch (Exception exception)
293 {
294 exception.printStackTrace();
295 if (OTSDEVSRTParallelMove.this.isPauseOnError())
296 {
297 OTSDEVSRTParallelMove.this.stop();
298 }
299 }
300 }
301 });
302 }
303 this.executor.shutdown();
304 try
305 {
306 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
307 }
308 catch (InterruptedException exception)
309 {
310 //
311 }
312
313 this.executor = Executors.newFixedThreadPool(1);
314 for (int i = 0; i < moveEvents.size(); i++)
315 {
316 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
317 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
318 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
319 this.executor.execute(new Runnable()
320 {
321 @Override
322 public void run()
323 {
324 try
325 {
326 moveEvent.execute();
327 }
328 catch (Exception exception)
329 {
330 exception.printStackTrace();
331 if (OTSDEVSRTParallelMove.this.isPauseOnError())
332 {
333 OTSDEVSRTParallelMove.this.stop();
334 }
335 }
336 }
337 });
338 }
339 this.executor.shutdown();
340 try
341 {
342 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
343 }
344 catch (InterruptedException exception)
345 {
346 //
347 }
348
349 this.executor = Executors.newFixedThreadPool(1);
350 for (int i = 0; i < moveEvents.size(); i++)
351 {
352 SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
353 final SimEventInterface<SimTimeDoubleUnit> moveEvent =
354 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
355 this.executor.execute(new Runnable()
356 {
357 @Override
358 public void run()
359 {
360 try
361 {
362 moveEvent.execute();
363 }
364 catch (Exception exception)
365 {
366 exception.printStackTrace();
367 if (OTSDEVSRTParallelMove.this.isPauseOnError())
368 {
369 OTSDEVSRTParallelMove.this.stop();
370 }
371 }
372 }
373 });
374 }
375 this.executor.shutdown();
376 try
377 {
378 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
379 }
380 catch (InterruptedException exception)
381 {
382 //
383 }
384
385 }
386 }
387 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
388
389 updateAnimation();
390 animationThread.stopAnimation();
391 }
392
393 }