View Javadoc
1   package org.opentrafficsim.core.dsol;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   import java.util.concurrent.ExecutorService;
6   import java.util.concurrent.Executors;
7   
8   import org.djunits.unit.DurationUnit;
9   import org.djunits.value.vdouble.scalar.Duration;
10  import org.djunits.value.vdouble.scalar.Time;
11  import org.opentrafficsim.core.gtu.GTU;
12  import org.opentrafficsim.core.logger.SimLogger;
13  
14  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16  import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
17  import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
18  import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
19  
20  /**
21   * <p>
22   * Copyright (c) 2013-2018 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
24   * <p>
25   * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
26   *          initial version Aug 15, 2014 <br>
27   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
28   */
29  public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
30  {
31      /** */
32      private static final long serialVersionUID = 20140909L;
33  
34      /** number of threads to use for move(). */
35      private int moveThreads = 1;
36  
37      /** the thread pool for parallel execution. */
38      private ExecutorService executor = null;
39  
40      /**
41       * Create a new OTSRealTimeClock.
42       * @param moveThreads The number of move threads to use
43       */
44      public OTSDEVSRTParallelMove(final int moveThreads)
45      {
46          super();
47          setMoveThreads(moveThreads);
48          setEventList(new SynchronizedRedBlackTree<>());
49      }
50  
51      /**
52       * Create a new OTSRealTimeClock.
53       */
54      public OTSDEVSRTParallelMove()
55      {
56          this(1);
57      }
58  
59      /**
60       * @param moveThreads set moveThreads
61       */
62      public final void setMoveThreads(final int moveThreads)
63      {
64          this.moveThreads = moveThreads;
65      }
66  
67      /**
68       * @return moveThreads
69       */
70      public final int getMoveThreads()
71      {
72          return this.moveThreads;
73      }
74  
75      /** {@inheritDoc} */
76      @Override
77      protected final Duration relativeMillis(final double factor)
78      {
79          return new Duration(factor, DurationUnit.MILLISECOND);
80      }
81  
82      /** {@inheritDoc} */
83      @Override
84      public final String toString()
85      {
86          return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
87      }
88  
89      /** {@inheritDoc} */
90      @Override
91      @SuppressWarnings("checkstyle:designforextension")
92      public void run()
93      {
94          AnimationThread animationThread = new AnimationThread(this);
95          animationThread.start();
96  
97          long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
98          SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
99          double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
100         double msec1 = relativeMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
101         Duration rSim = this.relativeMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
102 
103         while (this.isRunning() && !this.eventList.isEmpty()
104                 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
105         {
106             // check if speedFactor has changed. If yes: re-baseline.
107             if (factor != getSpeedFactor())
108             {
109                 clockTime0 = System.currentTimeMillis();
110                 simTime0 = this.simulatorTime;
111                 factor = getSpeedFactor();
112                 rSim = this.relativeMillis(getUpdateMsec() * factor);
113             }
114 
115             // check if we are behind; syncTime is the needed current time on the wall-clock
116             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
117             // delta is the time we might be behind
118             double simTime = this.simulatorTime.minus(simTime0).doubleValue();
119 
120             if (syncTime > simTime)
121             {
122                 // we are behind
123                 if (!isCatchup())
124                 {
125                     // if no catch-up: re-baseline.
126                     clockTime0 = System.currentTimeMillis();
127                     simTime0 = this.simulatorTime;
128                 }
129                 else
130                 {
131                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
132                     // first
133                     synchronized (super.semaphore)
134                     {
135                         Duration delta = relativeMillis((syncTime - simTime) / msec1);
136                         SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
137                         SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
138                         if (absSyncTime.lt(eventTime))
139                         {
140                             this.simulatorTime = absSyncTime;
141                         }
142                         else
143                         {
144                             this.simulatorTime = eventTime;
145                         }
146                     }
147                 }
148             }
149 
150             // peek at the first event and determine the time difference relative to RT speed; that determines
151             // how long we have to wait.
152             SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
153             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
154 
155             /*
156              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
157              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
158              * have to wait 10 times that amount. We might also be behind.
159              */
160             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
161             {
162                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
163                 {
164                     try
165                     {
166                         Thread.sleep(getUpdateMsec());
167 
168                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
169                         // this could cause a jump.
170                         if (factor != getSpeedFactor())
171                         {
172                             simTimeDiffMillis = 0.0;
173                         }
174 
175                     }
176                     catch (InterruptedException ie)
177                     {
178                         // do nothing
179                         ie = null;
180                     }
181 
182                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
183                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
184                     {
185                         event = this.eventList.first();
186                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
187                     }
188                     else
189                     {
190                         // make a small time step for the animation during wallclock waiting.
191                         // but never beyond the next event time.
192                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
193                         {
194                             synchronized (super.semaphore)
195                             {
196                                 this.simulatorTime.add(rSim);
197                             }
198                         }
199                     }
200                 }
201             }
202 
203             this.simulatorTime = event.getAbsoluteExecutionTime();
204             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
205 
206             if (this.moveThreads <= 1)
207             {
208                 synchronized (super.semaphore)
209                 {
210                     // carry out all events scheduled on this simulation time, as long as we are still running.
211                     while (this.isRunning() && !this.eventList.isEmpty()
212                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
213                     {
214                         event = this.eventList.removeFirst();
215                         try
216                         {
217                             event.execute();
218                         }
219                         catch (Exception exception)
220                         {
221                             SimLogger.always().error(exception);
222                             if (this.isPauseOnError())
223                             {
224                                 this.stop();
225                             }
226                         }
227                         if (!this.eventList.isEmpty())
228                         {
229                             // peek at next event for while loop.
230                             event = this.eventList.first();
231                         }
232                     }
233                 }
234             }
235 
236             else
237 
238             {
239                 // parallel execution of the move method
240                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
241                 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
242                 synchronized (super.semaphore)
243                 {
244                     while (this.isRunning() && !this.eventList.isEmpty()
245                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
246                     {
247                         event = this.eventList.removeFirst();
248                         SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
249                         if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
250                         {
251                             moveEvents.add(event);
252                         }
253                         else
254                         {
255                             try
256                             {
257                                 event.execute();
258                             }
259                             catch (Exception exception)
260                             {
261                                 SimLogger.always().error(exception);
262                                 if (this.isPauseOnError())
263                                 {
264                                     this.stop();
265                                 }
266                             }
267                         }
268                         if (!this.eventList.isEmpty())
269                         {
270                             // peek at next event for while loop.
271                             event = this.eventList.first();
272                         }
273                     }
274                 }
275 
276                 // then carry out the move events, based on a constant state at that time.
277                 // first make sure that new events will be stored in a temporary event list...
278                 this.executor = Executors.newFixedThreadPool(1);
279                 for (int i = 0; i < moveEvents.size(); i++)
280                 {
281                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
282                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
283                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
284                     this.executor.execute(new Runnable()
285                     {
286                         @Override
287                         public void run()
288                         {
289                             try
290                             {
291                                 moveEvent.execute();
292                             }
293                             catch (Exception exception)
294                             {
295                                 SimLogger.always().error(exception);
296                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
297                                 {
298                                     OTSDEVSRTParallelMove.this.stop();
299                                 }
300                             }
301                         }
302                     });
303                 }
304                 this.executor.shutdown();
305                 try
306                 {
307                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
308                 }
309                 catch (InterruptedException exception)
310                 {
311                     //
312                 }
313 
314                 this.executor = Executors.newFixedThreadPool(1);
315                 for (int i = 0; i < moveEvents.size(); i++)
316                 {
317                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
318                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
319                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
320                     this.executor.execute(new Runnable()
321                     {
322                         @Override
323                         public void run()
324                         {
325                             try
326                             {
327                                 moveEvent.execute();
328                             }
329                             catch (Exception exception)
330                             {
331                                 SimLogger.always().error(exception);
332                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
333                                 {
334                                     OTSDEVSRTParallelMove.this.stop();
335                                 }
336                             }
337                         }
338                     });
339                 }
340                 this.executor.shutdown();
341                 try
342                 {
343                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
344                 }
345                 catch (InterruptedException exception)
346                 {
347                     //
348                 }
349 
350                 this.executor = Executors.newFixedThreadPool(1);
351                 for (int i = 0; i < moveEvents.size(); i++)
352                 {
353                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
354                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
355                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
356                     this.executor.execute(new Runnable()
357                     {
358                         @Override
359                         public void run()
360                         {
361                             try
362                             {
363                                 moveEvent.execute();
364                             }
365                             catch (Exception exception)
366                             {
367                                 SimLogger.always().error(exception);
368                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
369                                 {
370                                     OTSDEVSRTParallelMove.this.stop();
371                                 }
372                             }
373                         }
374                     });
375                 }
376                 this.executor.shutdown();
377                 try
378                 {
379                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
380                 }
381                 catch (InterruptedException exception)
382                 {
383                     //
384                 }
385 
386             }
387         }
388         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
389 
390         updateAnimation();
391         animationThread.stopAnimation();
392     }
393 
394 }