1 package org.opentrafficsim.core.dsol;
2
3 import java.rmi.RemoteException;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.Executors;
8
9 import org.djunits.unit.DurationUnit;
10 import org.djunits.value.vdouble.scalar.Duration;
11 import org.djunits.value.vdouble.scalar.Time;
12 import org.opentrafficsim.core.gtu.GTU;
13
14 import nl.tudelft.simulation.dsol.SimRuntimeException;
15 import nl.tudelft.simulation.dsol.experiment.Replication;
16 import nl.tudelft.simulation.dsol.experiment.ReplicationMode;
17 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
18 import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
19 import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
20 import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
21
22 /**
23 * <p>
24 * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
25 * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
26 * <p>
27 * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
28 * initial version Aug 15, 2014 <br>
29 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
30 */
31 public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, OTSSimTimeDouble>
32 implements OTSDEVSSimulatorInterface, OTSAnimatorInterface
33 {
34 /** */
35 private static final long serialVersionUID = 20140909L;
36
37 /** number of threads to use for move(). */
38 private int moveThreads = 1;
39
40 /** the thread pool for parallel execution. */
41 private ExecutorService executor = null;
42
43 /**
44 * Create a new OTSRealTimeClock.
45 * @param moveThreads The number of move threads to use
46 */
47 public OTSDEVSRTParallelMove(final int moveThreads)
48 {
49 super();
50 setMoveThreads(moveThreads);
51 setEventList(new SynchronizedRedBlackTree<>());
52 }
53
54 /**
55 * Create a new OTSRealTimeClock.
56 */
57 public OTSDEVSRTParallelMove()
58 {
59 this(1);
60 }
61
62 /**
63 * @param moveThreads set moveThreads
64 */
65 public final void setMoveThreads(final int moveThreads)
66 {
67 this.moveThreads = moveThreads;
68 }
69
70 /**
71 * @return moveThreads
72 */
73 public final int getMoveThreads()
74 {
75 return this.moveThreads;
76 }
77
78 /** {@inheritDoc} */
79 @Override
80 public final void initialize(final Replication<Time, Duration, OTSSimTimeDouble> initReplication,
81 final ReplicationMode replicationMode) throws SimRuntimeException
82 {
83 try
84 {
85 super.initialize(initReplication, replicationMode);
86 }
87 catch (RemoteException exception)
88 {
89 throw new SimRuntimeException(exception);
90 }
91 }
92
93 /** {@inheritDoc} */
94 @Override
95 public final void runUpTo(final Time when) throws SimRuntimeException
96 {
97 super.runUpTo(when);
98 }
99
100 /** {@inheritDoc} */
101 @Override
102 protected final Duration relativeMillis(final double factor)
103 {
104 return new Duration(factor, DurationUnit.MILLISECOND);
105 }
106
107 /** {@inheritDoc} */
108 @Override
109 public final String toString()
110 {
111 return "OTSDEVSRealTimeClock [time=" + getSimulatorTime().getTime() + "]";
112 }
113
114 /** {@inheritDoc} */
115 @Override
116 @SuppressWarnings("checkstyle:designforextension")
117 public void run()
118 {
119 AnimationThread animationThread = new AnimationThread(this);
120 animationThread.start();
121
122 long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
123 OTSSimTimeDouble simTime0 = this.simulatorTime; // _______ current zero for the sim clock
124 double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
125 double msec1 = relativeMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
126 Duration rSim = this.relativeMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
127
128 while (this.isRunning() && !this.eventList.isEmpty()
129 && this.simulatorTime.le(this.replication.getTreatment().getEndTime()))
130 {
131 // check if speedFactor has changed. If yes: re-baseline.
132 if (factor != getSpeedFactor())
133 {
134 clockTime0 = System.currentTimeMillis();
135 simTime0 = this.simulatorTime;
136 factor = getSpeedFactor();
137 rSim = this.relativeMillis(getUpdateMsec() * factor);
138 }
139
140 // check if we are behind; syncTime is the needed current time on the wall-clock
141 double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
142 // delta is the time we might be behind
143 double simTime = this.simulatorTime.minus(simTime0).doubleValue();
144
145 if (syncTime > simTime)
146 {
147 // we are behind
148 if (!isCatchup())
149 {
150 // if no catch-up: re-baseline.
151 clockTime0 = System.currentTimeMillis();
152 simTime0 = this.simulatorTime;
153 }
154 else
155 {
156 // jump to the required wall-clock related time or to the time of the next event, whichever comes
157 // first
158 synchronized (super.semaphore)
159 {
160 Duration delta = relativeMillis((syncTime - simTime) / msec1);
161 OTSSimTimeDouble absSyncTime = this.simulatorTime.plus(delta);
162 OTSSimTimeDouble eventTime = this.eventList.first().getAbsoluteExecutionTime();
163 if (absSyncTime.lt(eventTime))
164 {
165 this.simulatorTime = absSyncTime;
166 }
167 else
168 {
169 this.simulatorTime = eventTime;
170 }
171 }
172 }
173 }
174
175 // peek at the first event and determine the time difference relative to RT speed; that determines
176 // how long we have to wait.
177 SimEventInterface<OTSSimTimeDouble> event = this.eventList.first();
178 double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
179
180 /*
181 * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
182 * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
183 * have to wait 10 times that amount. We might also be behind.
184 */
185 if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
186 {
187 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
188 {
189 try
190 {
191 Thread.sleep(getUpdateMsec());
192
193 // check if speedFactor has changed. If yes: break out of this loop and execute event.
194 // this could cause a jump.
195 if (factor != getSpeedFactor())
196 {
197 simTimeDiffMillis = 0.0;
198 }
199
200 }
201 catch (InterruptedException ie)
202 {
203 // do nothing
204 ie = null;
205 }
206
207 // check if an event has been inserted. In a real-time situation this can be dome by other threads
208 if (!event.equals(this.eventList.first())) // event inserted by a thread...
209 {
210 event = this.eventList.first();
211 simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
212 }
213 else
214 {
215 // make a small time step for the animation during wallclock waiting.
216 // but never beyond the next event time.
217 if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
218 {
219 synchronized (super.semaphore)
220 {
221 this.simulatorTime.add(rSim);
222 }
223 }
224 }
225 }
226 }
227
228 this.simulatorTime = event.getAbsoluteExecutionTime();
229 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
230
231 if (this.moveThreads <= 1)
232 {
233 synchronized (super.semaphore)
234 {
235 // carry out all events scheduled on this simulation time, as long as we are still running.
236 while (this.isRunning() && !this.eventList.isEmpty()
237 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
238 {
239 event = this.eventList.removeFirst();
240 try
241 {
242 event.execute();
243 }
244 catch (Exception exception)
245 {
246 exception.printStackTrace();
247 if (this.isPauseOnError())
248 {
249 this.stop();
250 }
251 }
252 if (!this.eventList.isEmpty())
253 {
254 // peek at next event for while loop.
255 event = this.eventList.first();
256 }
257 }
258 }
259 }
260
261 else
262
263 {
264 // parallel execution of the move method
265 // first carry out all the non-move events and make a list of move events to be carried out in parallel
266 List<SimEventInterface<OTSSimTimeDouble>> moveEvents = new ArrayList<>();
267 synchronized (super.semaphore)
268 {
269 while (this.isRunning() && !this.eventList.isEmpty()
270 && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
271 {
272 event = this.eventList.removeFirst();
273 SimEvent<OTSSimTimeDouble> se = (SimEvent<OTSSimTimeDouble>) event;
274 if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
275 {
276 moveEvents.add(event);
277 }
278 else
279 {
280 try
281 {
282 event.execute();
283 }
284 catch (Exception exception)
285 {
286 exception.printStackTrace();
287 if (this.isPauseOnError())
288 {
289 this.stop();
290 }
291 }
292 }
293 if (!this.eventList.isEmpty())
294 {
295 // peek at next event for while loop.
296 event = this.eventList.first();
297 }
298 }
299 }
300
301 // then carry out the move events, based on a constant state at that time.
302 // first make sure that new events will be stored in a temporary event list...
303 this.executor = Executors.newFixedThreadPool(1);
304 for (int i = 0; i < moveEvents.size(); i++)
305 {
306 SimEvent<OTSSimTimeDouble> se = (SimEvent<OTSSimTimeDouble>) moveEvents.get(i);
307 final SimEventInterface<OTSSimTimeDouble> moveEvent =
308 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
309 this.executor.execute(new Runnable()
310 {
311 @Override
312 public void run()
313 {
314 try
315 {
316 moveEvent.execute();
317 }
318 catch (Exception exception)
319 {
320 exception.printStackTrace();
321 if (OTSDEVSRTParallelMove.this.isPauseOnError())
322 {
323 OTSDEVSRTParallelMove.this.stop();
324 }
325 }
326 }
327 });
328 }
329 this.executor.shutdown();
330 try
331 {
332 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
333 }
334 catch (InterruptedException exception)
335 {
336 //
337 }
338
339 this.executor = Executors.newFixedThreadPool(1);
340 for (int i = 0; i < moveEvents.size(); i++)
341 {
342 SimEvent<OTSSimTimeDouble> se = (SimEvent<OTSSimTimeDouble>) moveEvents.get(i);
343 final SimEventInterface<OTSSimTimeDouble> moveEvent =
344 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
345 this.executor.execute(new Runnable()
346 {
347 @Override
348 public void run()
349 {
350 try
351 {
352 moveEvent.execute();
353 }
354 catch (Exception exception)
355 {
356 exception.printStackTrace();
357 if (OTSDEVSRTParallelMove.this.isPauseOnError())
358 {
359 OTSDEVSRTParallelMove.this.stop();
360 }
361 }
362 }
363 });
364 }
365 this.executor.shutdown();
366 try
367 {
368 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
369 }
370 catch (InterruptedException exception)
371 {
372 //
373 }
374
375 this.executor = Executors.newFixedThreadPool(1);
376 for (int i = 0; i < moveEvents.size(); i++)
377 {
378 SimEvent<OTSSimTimeDouble> se = (SimEvent<OTSSimTimeDouble>) moveEvents.get(i);
379 final SimEventInterface<OTSSimTimeDouble> moveEvent =
380 new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
381 this.executor.execute(new Runnable()
382 {
383 @Override
384 public void run()
385 {
386 try
387 {
388 moveEvent.execute();
389 }
390 catch (Exception exception)
391 {
392 exception.printStackTrace();
393 if (OTSDEVSRTParallelMove.this.isPauseOnError())
394 {
395 OTSDEVSRTParallelMove.this.stop();
396 }
397 }
398 }
399 });
400 }
401 this.executor.shutdown();
402 try
403 {
404 this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
405 }
406 catch (InterruptedException exception)
407 {
408 //
409 }
410
411 }
412 }
413 this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
414
415 updateAnimation();
416 animationThread.stopAnimation();
417 }
418
419 }