View Javadoc
1   package org.opentrafficsim.core.dsol;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   import java.util.concurrent.ExecutorService;
6   import java.util.concurrent.Executors;
7   
8   import org.djunits.unit.DurationUnit;
9   import org.djunits.value.vdouble.scalar.Duration;
10  import org.djunits.value.vdouble.scalar.Time;
11  import org.opentrafficsim.core.gtu.GTU;
12  
13  import nl.tudelft.simulation.dsol.SimRuntimeException;
14  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16  import nl.tudelft.simulation.dsol.logger.SimLogger;
17  import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
18  import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
19  import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
20  
21  /**
22   * <p>
23   * Copyright (c) 2013-2019 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
24   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
25   * <p>
26   * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
27   *          initial version Aug 15, 2014 <br>
28   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
29   */
30  public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
31  {
32      /** */
33      private static final long serialVersionUID = 20140909L;
34  
35      /** number of threads to use for move(). */
36      private int moveThreads = 1;
37  
38      /** the thread pool for parallel execution. */
39      private ExecutorService executor = null;
40  
41      /**
42       * Create a new OTSRealTimeClock.
43       * @param moveThreads int; The number of move threads to use
44       */
45      public OTSDEVSRTParallelMove(final int moveThreads)
46      {
47          super();
48          setMoveThreads(moveThreads);
49          setEventList(new SynchronizedRedBlackTree<>());
50      }
51  
52      /**
53       * Create a new OTSRealTimeClock.
54       */
55      public OTSDEVSRTParallelMove()
56      {
57          this(1);
58      }
59  
60      /**
61       * @param moveThreads int; set moveThreads
62       */
63      public final void setMoveThreads(final int moveThreads)
64      {
65          this.moveThreads = moveThreads;
66      }
67  
68      /**
69       * @return moveThreads
70       */
71      public final int getMoveThreads()
72      {
73          return this.moveThreads;
74      }
75  
76      /** {@inheritDoc} */
77      @Override
78      protected final Duration relativeMillis(final double factor)
79      {
80          return new Duration(factor, DurationUnit.MILLISECOND);
81      }
82  
83      /** {@inheritDoc} */
84      @Override
85      public final String toString()
86      {
87          return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
88      }
89  
90      /** {@inheritDoc} */
91      @Override
92      @SuppressWarnings("checkstyle:designforextension")
93      public void run()
94      {
95          AnimationThread animationThread = new AnimationThread(this);
96          animationThread.start();
97  
98          long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
99          SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
100         double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
101         double msec1 = relativeMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
102         Duration rSim = this.relativeMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
103 
104         while (this.isRunning() && !this.eventList.isEmpty()
105                 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
106         {
107             // check if speedFactor has changed. If yes: re-baseline.
108             if (factor != getSpeedFactor())
109             {
110                 clockTime0 = System.currentTimeMillis();
111                 simTime0 = this.simulatorTime;
112                 factor = getSpeedFactor();
113                 rSim = this.relativeMillis(getUpdateMsec() * factor);
114             }
115 
116             // check if we are behind; syncTime is the needed current time on the wall-clock
117             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
118             // delta is the time we might be behind
119             double simTime = this.simulatorTime.diff(simTime0).doubleValue();
120 
121             if (syncTime > simTime)
122             {
123                 // we are behind
124                 if (!isCatchup())
125                 {
126                     // if no catch-up: re-baseline.
127                     clockTime0 = System.currentTimeMillis();
128                     simTime0 = this.simulatorTime;
129                 }
130                 else
131                 {
132                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
133                     // first
134                     synchronized (super.semaphore)
135                     {
136                         Duration delta = relativeMillis((syncTime - simTime) / msec1);
137                         SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
138                         SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
139                         if (absSyncTime.lt(eventTime))
140                         {
141                             this.simulatorTime = absSyncTime;
142                         }
143                         else
144                         {
145                             this.simulatorTime = eventTime;
146                         }
147                     }
148                 }
149             }
150 
151             // peek at the first event and determine the time difference relative to RT speed; that determines
152             // how long we have to wait.
153             SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
154             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
155 
156             /*
157              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
158              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
159              * have to wait 10 times that amount. We might also be behind.
160              */
161             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
162             {
163                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
164                 {
165                     try
166                     {
167                         Thread.sleep(getUpdateMsec());
168 
169                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
170                         // this could cause a jump.
171                         if (factor != getSpeedFactor())
172                         {
173                             simTimeDiffMillis = 0.0;
174                         }
175 
176                     }
177                     catch (InterruptedException ie)
178                     {
179                         // do nothing
180                         ie = null;
181                     }
182 
183                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
184                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
185                     {
186                         event = this.eventList.first();
187                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
188                     }
189                     else
190                     {
191                         // make a small time step for the animation during wallclock waiting.
192                         // but never beyond the next event time.
193                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
194                         {
195                             synchronized (super.semaphore)
196                             {
197                                 this.simulatorTime.add(rSim);
198                             }
199                         }
200                     }
201                 }
202             }
203 
204             this.simulatorTime = event.getAbsoluteExecutionTime();
205             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
206 
207             if (this.moveThreads <= 1)
208             {
209                 synchronized (super.semaphore)
210                 {
211                     // carry out all events scheduled on this simulation time, as long as we are still running.
212                     while (this.isRunning() && !this.eventList.isEmpty()
213                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
214                     {
215                         event = this.eventList.removeFirst();
216                         try
217                         {
218                             event.execute();
219                         }
220                         catch (Exception exception)
221                         {
222                             SimLogger.always().error(exception);
223                             if (this.isPauseOnError())
224                             {
225                                 try
226                                 {
227                                     this.stop();
228                                 }
229                                 catch (SimRuntimeException exception1)
230                                 {
231                                     SimLogger.always().error(exception1);
232                                 }
233                             }
234                         }
235                         if (!this.eventList.isEmpty())
236                         {
237                             // peek at next event for while loop.
238                             event = this.eventList.first();
239                         }
240                     }
241                 }
242             }
243 
244             else
245 
246             {
247                 // parallel execution of the move method
248                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
249                 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
250                 synchronized (super.semaphore)
251                 {
252                     while (this.isRunning() && !this.eventList.isEmpty()
253                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
254                     {
255                         event = this.eventList.removeFirst();
256                         SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
257                         if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
258                         {
259                             moveEvents.add(event);
260                         }
261                         else
262                         {
263                             try
264                             {
265                                 event.execute();
266                             }
267                             catch (Exception exception)
268                             {
269                                 SimLogger.always().error(exception);
270                                 if (this.isPauseOnError())
271                                 {
272                                     try
273                                     {
274                                         this.stop();
275                                     }
276                                     catch (SimRuntimeException exception1)
277                                     {
278                                         SimLogger.always().error(exception1);
279                                     }
280                                 }
281                             }
282                         }
283                         if (!this.eventList.isEmpty())
284                         {
285                             // peek at next event for while loop.
286                             event = this.eventList.first();
287                         }
288                     }
289                 }
290 
291                 // then carry out the move events, based on a constant state at that time.
292                 // first make sure that new events will be stored in a temporary event list...
293                 this.executor = Executors.newFixedThreadPool(1);
294                 for (int i = 0; i < moveEvents.size(); i++)
295                 {
296                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
297                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
298                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
299                     this.executor.execute(new Runnable()
300                     {
301                         @Override
302                         public void run()
303                         {
304                             try
305                             {
306                                 moveEvent.execute();
307                             }
308                             catch (Exception exception)
309                             {
310                                 SimLogger.always().error(exception);
311                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
312                                 {
313                                     try
314                                     {
315                                         OTSDEVSRTParallelMove.this.stop();
316                                     }
317                                     catch (SimRuntimeException exception1)
318                                     {
319                                         SimLogger.always().error(exception1);
320                                     }
321                                 }
322                             }
323                         }
324                     });
325                 }
326                 this.executor.shutdown();
327                 try
328                 {
329                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
330                 }
331                 catch (InterruptedException exception)
332                 {
333                     //
334                 }
335 
336                 this.executor = Executors.newFixedThreadPool(1);
337                 for (int i = 0; i < moveEvents.size(); i++)
338                 {
339                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
340                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
341                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
342                     this.executor.execute(new Runnable()
343                     {
344                         @Override
345                         public void run()
346                         {
347                             try
348                             {
349                                 moveEvent.execute();
350                             }
351                             catch (Exception exception)
352                             {
353                                 SimLogger.always().error(exception);
354                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
355                                 {
356                                     try
357                                     {
358                                         OTSDEVSRTParallelMove.this.stop();
359                                     }
360                                     catch (SimRuntimeException exception1)
361                                     {
362                                         SimLogger.always().error(exception1);
363                                     }
364                                 }
365                             }
366                         }
367                     });
368                 }
369                 this.executor.shutdown();
370                 try
371                 {
372                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
373                 }
374                 catch (InterruptedException exception)
375                 {
376                     //
377                 }
378 
379                 this.executor = Executors.newFixedThreadPool(1);
380                 for (int i = 0; i < moveEvents.size(); i++)
381                 {
382                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
383                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
384                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
385                     this.executor.execute(new Runnable()
386                     {
387                         @Override
388                         public void run()
389                         {
390                             try
391                             {
392                                 moveEvent.execute();
393                             }
394                             catch (Exception exception)
395                             {
396                                 SimLogger.always().error(exception);
397                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
398                                 {
399                                     try
400                                     {
401                                         OTSDEVSRTParallelMove.this.stop();
402                                     }
403                                     catch (SimRuntimeException exception1)
404                                     {
405                                         SimLogger.always().error(exception1);
406                                     }
407                                 }
408                             }
409                         }
410                     });
411                 }
412                 this.executor.shutdown();
413                 try
414                 {
415                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
416                 }
417                 catch (InterruptedException exception)
418                 {
419                     //
420                 }
421 
422             }
423         }
424         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
425 
426         updateAnimation();
427         animationThread.stopAnimation();
428     }
429 
430 }