View Javadoc
1   package org.opentrafficsim.core.dsol;
2   
3   import java.rmi.RemoteException;
4   import java.util.ArrayList;
5   import java.util.List;
6   import java.util.concurrent.ExecutorService;
7   import java.util.concurrent.Executors;
8   
9   import org.djunits.unit.DurationUnit;
10  import org.djunits.value.vdouble.scalar.Duration;
11  import org.djunits.value.vdouble.scalar.Time;
12  import org.opentrafficsim.core.gtu.GTU;
13  
14  import nl.tudelft.simulation.dsol.SimRuntimeException;
15  import nl.tudelft.simulation.dsol.experiment.Replication;
16  import nl.tudelft.simulation.dsol.experiment.ReplicationMode;
17  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
18  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
19  import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
20  import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
21  
22  /**
23   * <p>
24   * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
25   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
26   * <p>
27   * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
28   *          initial version Aug 15, 2014 <br>
29   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
30   */
31  public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, OTSSimTimeDouble>
32          implements OTSDEVSSimulatorInterface, OTSAnimatorInterface
33  {
34      /** */
35      private static final long serialVersionUID = 20140909L;
36  
37      /** number of threads to use for move(). */
38      private int moveThreads = 1;
39  
40      /** the thread pool for parallel execution. */
41      private ExecutorService executor = null;
42  
43      /**
44       * Create a new OTSRealTimeClock.
45       * @param moveThreads The number of move threads to use
46       */
47      public OTSDEVSRTParallelMove(final int moveThreads)
48      {
49          super();
50          setMoveThreads(moveThreads);
51          setEventList(new SynchronizedRedBlackTree<>());
52      }
53  
54      /**
55       * Create a new OTSRealTimeClock.
56       */
57      public OTSDEVSRTParallelMove()
58      {
59          this(1);
60      }
61  
62      /**
63       * @param moveThreads set moveThreads
64       */
65      public final void setMoveThreads(final int moveThreads)
66      {
67          this.moveThreads = moveThreads;
68      }
69  
70      /**
71       * @return moveThreads
72       */
73      public final int getMoveThreads()
74      {
75          return this.moveThreads;
76      }
77  
78      /** {@inheritDoc} */
79      @Override
80      public final void initialize(final Replication<Time, Duration, OTSSimTimeDouble> initReplication,
81              final ReplicationMode replicationMode) throws SimRuntimeException
82      {
83          try
84          {
85              super.initialize(initReplication, replicationMode);
86          }
87          catch (RemoteException exception)
88          {
89              throw new SimRuntimeException(exception);
90          }
91      }
92  
93      /** {@inheritDoc} */
94      @Override
95      public final void runUpTo(final Time when) throws SimRuntimeException
96      {
97          super.runUpTo(when);
98      }
99  
100     /** {@inheritDoc} */
101     @Override
102     protected final Duration relativeMillis(final double factor)
103     {
104         return new Duration(factor, DurationUnit.MILLISECOND);
105     }
106 
107     /** {@inheritDoc} */
108     @Override
109     public final String toString()
110     {
111         return "OTSDEVSRealTimeClock [time=" + getSimulatorTime().getTime() + "]";
112     }
113 
114     /** {@inheritDoc} */
115     @Override
116     @SuppressWarnings("checkstyle:designforextension")
117     public void run()
118     {
119         AnimationThread animationThread = new AnimationThread(this);
120         animationThread.start();
121 
122         long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
123         OTSSimTimeDouble simTime0 = this.simulatorTime; // _______ current zero for the sim clock
124         double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
125         double msec1 = relativeMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
126         Duration rSim = this.relativeMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
127 
128         while (this.isRunning() && !this.eventList.isEmpty()
129                 && this.simulatorTime.le(this.replication.getTreatment().getEndTime()))
130         {
131             // check if speedFactor has changed. If yes: re-baseline.
132             if (factor != getSpeedFactor())
133             {
134                 clockTime0 = System.currentTimeMillis();
135                 simTime0 = this.simulatorTime;
136                 factor = getSpeedFactor();
137                 rSim = this.relativeMillis(getUpdateMsec() * factor);
138             }
139 
140             // check if we are behind; syncTime is the needed current time on the wall-clock
141             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
142             // delta is the time we might be behind
143             double simTime = this.simulatorTime.minus(simTime0).doubleValue();
144 
145             if (syncTime > simTime)
146             {
147                 // we are behind
148                 if (!isCatchup())
149                 {
150                     // if no catch-up: re-baseline.
151                     clockTime0 = System.currentTimeMillis();
152                     simTime0 = this.simulatorTime;
153                 }
154                 else
155                 {
156                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
157                     // first
158                     synchronized (super.semaphore)
159                     {
160                         Duration delta = relativeMillis((syncTime - simTime) / msec1);
161                         OTSSimTimeDouble absSyncTime = this.simulatorTime.plus(delta);
162                         OTSSimTimeDouble eventTime = this.eventList.first().getAbsoluteExecutionTime();
163                         if (absSyncTime.lt(eventTime))
164                         {
165                             this.simulatorTime = absSyncTime;
166                         }
167                         else
168                         {
169                             this.simulatorTime = eventTime;
170                         }
171                     }
172                 }
173             }
174 
175             // peek at the first event and determine the time difference relative to RT speed; that determines
176             // how long we have to wait.
177             SimEventInterface<OTSSimTimeDouble> event = this.eventList.first();
178             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
179 
180             /*
181              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
182              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
183              * have to wait 10 times that amount. We might also be behind.
184              */
185             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
186             {
187                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
188                 {
189                     try
190                     {
191                         Thread.sleep(getUpdateMsec());
192 
193                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
194                         // this could cause a jump.
195                         if (factor != getSpeedFactor())
196                         {
197                             simTimeDiffMillis = 0.0;
198                         }
199 
200                     }
201                     catch (InterruptedException ie)
202                     {
203                         // do nothing
204                         ie = null;
205                     }
206 
207                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
208                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
209                     {
210                         event = this.eventList.first();
211                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
212                     }
213                     else
214                     {
215                         // make a small time step for the animation during wallclock waiting.
216                         // but never beyond the next event time.
217                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
218                         {
219                             synchronized (super.semaphore)
220                             {
221                                 this.simulatorTime.add(rSim);
222                             }
223                         }
224                     }
225                 }
226             }
227 
228             this.simulatorTime = event.getAbsoluteExecutionTime();
229             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
230 
231             if (this.moveThreads <= 1)
232             {
233                 synchronized (super.semaphore)
234                 {
235                     // carry out all events scheduled on this simulation time, as long as we are still running.
236                     while (this.isRunning() && !this.eventList.isEmpty()
237                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
238                     {
239                         event = this.eventList.removeFirst();
240                         try
241                         {
242                             event.execute();
243                         }
244                         catch (Exception exception)
245                         {
246                             exception.printStackTrace();
247                             if (this.isPauseOnError())
248                             {
249                                 this.stop();
250                             }
251                         }
252                         if (!this.eventList.isEmpty())
253                         {
254                             // peek at next event for while loop.
255                             event = this.eventList.first();
256                         }
257                     }
258                 }
259             }
260 
261             else
262 
263             {
264                 // parallel execution of the move method
265                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
266                 List<SimEventInterface<OTSSimTimeDouble>> moveEvents = new ArrayList<>();
267                 synchronized (super.semaphore)
268                 {
269                     while (this.isRunning() && !this.eventList.isEmpty()
270                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
271                     {
272                         event = this.eventList.removeFirst();
273                         SimEvent<OTSSimTimeDouble> se = (SimEvent<OTSSimTimeDouble>) event;
274                         if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
275                         {
276                             moveEvents.add(event);
277                         }
278                         else
279                         {
280                             try
281                             {
282                                 event.execute();
283                             }
284                             catch (Exception exception)
285                             {
286                                 exception.printStackTrace();
287                                 if (this.isPauseOnError())
288                                 {
289                                     this.stop();
290                                 }
291                             }
292                         }
293                         if (!this.eventList.isEmpty())
294                         {
295                             // peek at next event for while loop.
296                             event = this.eventList.first();
297                         }
298                     }
299                 }
300 
301                 // then carry out the move events, based on a constant state at that time.
302                 // first make sure that new events will be stored in a temporary event list...
303                 this.executor = Executors.newFixedThreadPool(1);
304                 for (int i = 0; i < moveEvents.size(); i++)
305                 {
306                     SimEvent<OTSSimTimeDouble> se = (SimEvent<OTSSimTimeDouble>) moveEvents.get(i);
307                     final SimEventInterface<OTSSimTimeDouble> moveEvent =
308                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
309                     this.executor.execute(new Runnable()
310                     {
311                         @Override
312                         public void run()
313                         {
314                             try
315                             {
316                                 moveEvent.execute();
317                             }
318                             catch (Exception exception)
319                             {
320                                 exception.printStackTrace();
321                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
322                                 {
323                                     OTSDEVSRTParallelMove.this.stop();
324                                 }
325                             }
326                         }
327                     });
328                 }
329                 this.executor.shutdown();
330                 try
331                 {
332                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
333                 }
334                 catch (InterruptedException exception)
335                 {
336                     //
337                 }
338 
339                 this.executor = Executors.newFixedThreadPool(1);
340                 for (int i = 0; i < moveEvents.size(); i++)
341                 {
342                     SimEvent<OTSSimTimeDouble> se = (SimEvent<OTSSimTimeDouble>) moveEvents.get(i);
343                     final SimEventInterface<OTSSimTimeDouble> moveEvent =
344                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
345                     this.executor.execute(new Runnable()
346                     {
347                         @Override
348                         public void run()
349                         {
350                             try
351                             {
352                                 moveEvent.execute();
353                             }
354                             catch (Exception exception)
355                             {
356                                 exception.printStackTrace();
357                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
358                                 {
359                                     OTSDEVSRTParallelMove.this.stop();
360                                 }
361                             }
362                         }
363                     });
364                 }
365                 this.executor.shutdown();
366                 try
367                 {
368                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
369                 }
370                 catch (InterruptedException exception)
371                 {
372                     //
373                 }
374 
375                 this.executor = Executors.newFixedThreadPool(1);
376                 for (int i = 0; i < moveEvents.size(); i++)
377                 {
378                     SimEvent<OTSSimTimeDouble> se = (SimEvent<OTSSimTimeDouble>) moveEvents.get(i);
379                     final SimEventInterface<OTSSimTimeDouble> moveEvent =
380                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
381                     this.executor.execute(new Runnable()
382                     {
383                         @Override
384                         public void run()
385                         {
386                             try
387                             {
388                                 moveEvent.execute();
389                             }
390                             catch (Exception exception)
391                             {
392                                 exception.printStackTrace();
393                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
394                                 {
395                                     OTSDEVSRTParallelMove.this.stop();
396                                 }
397                             }
398                         }
399                     });
400                 }
401                 this.executor.shutdown();
402                 try
403                 {
404                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
405                 }
406                 catch (InterruptedException exception)
407                 {
408                     //
409                 }
410 
411             }
412         }
413         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
414 
415         updateAnimation();
416         animationThread.stopAnimation();
417     }
418 
419 }