View Javadoc
1   package org.opentrafficsim.core.dsol;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   import java.util.concurrent.ExecutorService;
6   import java.util.concurrent.Executors;
7   
8   import org.djunits.unit.DurationUnit;
9   import org.djunits.value.vdouble.scalar.Duration;
10  import org.djunits.value.vdouble.scalar.Time;
11  import org.opentrafficsim.core.gtu.GTU;
12  
13  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
14  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
15  import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
16  import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
17  import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
18  
19  /**
20   * <p>
21   * Copyright (c) 2013-2018 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
22   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
23   * <p>
24   * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
25   *          initial version Aug 15, 2014 <br>
26   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
27   */
28  public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
29  {
30      /** */
31      private static final long serialVersionUID = 20140909L;
32  
33      /** number of threads to use for move(). */
34      private int moveThreads = 1;
35  
36      /** the thread pool for parallel execution. */
37      private ExecutorService executor = null;
38  
39      /**
40       * Create a new OTSRealTimeClock.
41       * @param moveThreads The number of move threads to use
42       */
43      public OTSDEVSRTParallelMove(final int moveThreads)
44      {
45          super();
46          setMoveThreads(moveThreads);
47          setEventList(new SynchronizedRedBlackTree<>());
48      }
49  
50      /**
51       * Create a new OTSRealTimeClock.
52       */
53      public OTSDEVSRTParallelMove()
54      {
55          this(1);
56      }
57  
58      /**
59       * @param moveThreads set moveThreads
60       */
61      public final void setMoveThreads(final int moveThreads)
62      {
63          this.moveThreads = moveThreads;
64      }
65  
66      /**
67       * @return moveThreads
68       */
69      public final int getMoveThreads()
70      {
71          return this.moveThreads;
72      }
73  
74      /** {@inheritDoc} */
75      @Override
76      protected final Duration relativeMillis(final double factor)
77      {
78          return new Duration(factor, DurationUnit.MILLISECOND);
79      }
80  
81      /** {@inheritDoc} */
82      @Override
83      public final String toString()
84      {
85          return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
86      }
87  
88      /** {@inheritDoc} */
89      @Override
90      @SuppressWarnings("checkstyle:designforextension")
91      public void run()
92      {
93          AnimationThread animationThread = new AnimationThread(this);
94          animationThread.start();
95  
96          long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
97          SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
98          double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
99          double msec1 = relativeMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
100         Duration rSim = this.relativeMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec' wall clock
101 
102         while (this.isRunning() && !this.eventList.isEmpty()
103                 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
104         {
105             // check if speedFactor has changed. If yes: re-baseline.
106             if (factor != getSpeedFactor())
107             {
108                 clockTime0 = System.currentTimeMillis();
109                 simTime0 = this.simulatorTime;
110                 factor = getSpeedFactor();
111                 rSim = this.relativeMillis(getUpdateMsec() * factor);
112             }
113 
114             // check if we are behind; syncTime is the needed current time on the wall-clock
115             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
116             // delta is the time we might be behind
117             double simTime = this.simulatorTime.minus(simTime0).doubleValue();
118 
119             if (syncTime > simTime)
120             {
121                 // we are behind
122                 if (!isCatchup())
123                 {
124                     // if no catch-up: re-baseline.
125                     clockTime0 = System.currentTimeMillis();
126                     simTime0 = this.simulatorTime;
127                 }
128                 else
129                 {
130                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
131                     // first
132                     synchronized (super.semaphore)
133                     {
134                         Duration delta = relativeMillis((syncTime - simTime) / msec1);
135                         SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
136                         SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
137                         if (absSyncTime.lt(eventTime))
138                         {
139                             this.simulatorTime = absSyncTime;
140                         }
141                         else
142                         {
143                             this.simulatorTime = eventTime;
144                         }
145                     }
146                 }
147             }
148 
149             // peek at the first event and determine the time difference relative to RT speed; that determines
150             // how long we have to wait.
151             SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
152             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
153 
154             /*
155              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
156              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
157              * have to wait 10 times that amount. We might also be behind.
158              */
159             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
160             {
161                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
162                 {
163                     try
164                     {
165                         Thread.sleep(getUpdateMsec());
166 
167                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
168                         // this could cause a jump.
169                         if (factor != getSpeedFactor())
170                         {
171                             simTimeDiffMillis = 0.0;
172                         }
173 
174                     }
175                     catch (InterruptedException ie)
176                     {
177                         // do nothing
178                         ie = null;
179                     }
180 
181                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
182                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
183                     {
184                         event = this.eventList.first();
185                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
186                     }
187                     else
188                     {
189                         // make a small time step for the animation during wallclock waiting.
190                         // but never beyond the next event time.
191                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
192                         {
193                             synchronized (super.semaphore)
194                             {
195                                 this.simulatorTime.add(rSim);
196                             }
197                         }
198                     }
199                 }
200             }
201 
202             this.simulatorTime = event.getAbsoluteExecutionTime();
203             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
204 
205             if (this.moveThreads <= 1)
206             {
207                 synchronized (super.semaphore)
208                 {
209                     // carry out all events scheduled on this simulation time, as long as we are still running.
210                     while (this.isRunning() && !this.eventList.isEmpty()
211                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
212                     {
213                         event = this.eventList.removeFirst();
214                         try
215                         {
216                             event.execute();
217                         }
218                         catch (Exception exception)
219                         {
220                             exception.printStackTrace();
221                             if (this.isPauseOnError())
222                             {
223                                 this.stop();
224                             }
225                         }
226                         if (!this.eventList.isEmpty())
227                         {
228                             // peek at next event for while loop.
229                             event = this.eventList.first();
230                         }
231                     }
232                 }
233             }
234 
235             else
236 
237             {
238                 // parallel execution of the move method
239                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
240                 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
241                 synchronized (super.semaphore)
242                 {
243                     while (this.isRunning() && !this.eventList.isEmpty()
244                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
245                     {
246                         event = this.eventList.removeFirst();
247                         SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
248                         if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
249                         {
250                             moveEvents.add(event);
251                         }
252                         else
253                         {
254                             try
255                             {
256                                 event.execute();
257                             }
258                             catch (Exception exception)
259                             {
260                                 exception.printStackTrace();
261                                 if (this.isPauseOnError())
262                                 {
263                                     this.stop();
264                                 }
265                             }
266                         }
267                         if (!this.eventList.isEmpty())
268                         {
269                             // peek at next event for while loop.
270                             event = this.eventList.first();
271                         }
272                     }
273                 }
274 
275                 // then carry out the move events, based on a constant state at that time.
276                 // first make sure that new events will be stored in a temporary event list...
277                 this.executor = Executors.newFixedThreadPool(1);
278                 for (int i = 0; i < moveEvents.size(); i++)
279                 {
280                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
281                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
282                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
283                     this.executor.execute(new Runnable()
284                     {
285                         @Override
286                         public void run()
287                         {
288                             try
289                             {
290                                 moveEvent.execute();
291                             }
292                             catch (Exception exception)
293                             {
294                                 exception.printStackTrace();
295                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
296                                 {
297                                     OTSDEVSRTParallelMove.this.stop();
298                                 }
299                             }
300                         }
301                     });
302                 }
303                 this.executor.shutdown();
304                 try
305                 {
306                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
307                 }
308                 catch (InterruptedException exception)
309                 {
310                     //
311                 }
312 
313                 this.executor = Executors.newFixedThreadPool(1);
314                 for (int i = 0; i < moveEvents.size(); i++)
315                 {
316                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
317                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
318                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
319                     this.executor.execute(new Runnable()
320                     {
321                         @Override
322                         public void run()
323                         {
324                             try
325                             {
326                                 moveEvent.execute();
327                             }
328                             catch (Exception exception)
329                             {
330                                 exception.printStackTrace();
331                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
332                                 {
333                                     OTSDEVSRTParallelMove.this.stop();
334                                 }
335                             }
336                         }
337                     });
338                 }
339                 this.executor.shutdown();
340                 try
341                 {
342                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
343                 }
344                 catch (InterruptedException exception)
345                 {
346                     //
347                 }
348 
349                 this.executor = Executors.newFixedThreadPool(1);
350                 for (int i = 0; i < moveEvents.size(); i++)
351                 {
352                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
353                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
354                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
355                     this.executor.execute(new Runnable()
356                     {
357                         @Override
358                         public void run()
359                         {
360                             try
361                             {
362                                 moveEvent.execute();
363                             }
364                             catch (Exception exception)
365                             {
366                                 exception.printStackTrace();
367                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
368                                 {
369                                     OTSDEVSRTParallelMove.this.stop();
370                                 }
371                             }
372                         }
373                     });
374                 }
375                 this.executor.shutdown();
376                 try
377                 {
378                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
379                 }
380                 catch (InterruptedException exception)
381                 {
382                     //
383                 }
384 
385             }
386         }
387         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
388 
389         updateAnimation();
390         animationThread.stopAnimation();
391     }
392 
393 }