View Javadoc
1   package org.opentrafficsim.core.dsol;
2   
3   import java.io.Serializable;
4   import java.util.ArrayList;
5   import java.util.List;
6   import java.util.concurrent.ExecutorService;
7   import java.util.concurrent.Executors;
8   
9   import org.djunits.unit.DurationUnit;
10  import org.djunits.value.vdouble.scalar.Duration;
11  import org.djunits.value.vdouble.scalar.Time;
12  import org.opentrafficsim.core.gtu.GTU;
13  
14  import nl.tudelft.simulation.dsol.SimRuntimeException;
15  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
16  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
17  import nl.tudelft.simulation.dsol.simtime.SimTimeDoubleUnit;
18  import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeClock;
19  import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
20  
21  /**
22   * <p>
23   * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
24   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
25   * <p>
26   * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
27   *          initial version Aug 15, 2014 <br>
28   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
29   */
30  public class OTSDEVSRTParallelMove extends DEVSRealTimeClock<Time, Duration, SimTimeDoubleUnit>
31  {
32      /** */
33      private static final long serialVersionUID = 20140909L;
34  
35      /** number of threads to use for move(). */
36      private int moveThreads = 1;
37  
38      /** the thread pool for parallel execution. */
39      private ExecutorService executor = null;
40  
41      /**
42       * Create a new OTSRealTimeClock.
43       * @param moveThreads int; The number of move threads to use
44       * @param simulatorId the id of the simulator to use in remote communication
45       */
46      public OTSDEVSRTParallelMove(final int moveThreads, final Serializable simulatorId)
47      {
48          super(simulatorId);
49          setMoveThreads(moveThreads);
50          setEventList(new SynchronizedRedBlackTree<>());
51      }
52  
53      /**
54       * Create a new OTSRealTimeClock.
55       * @param simulatorId the id of the simulator to use in remote communication
56       */
57      public OTSDEVSRTParallelMove(final Serializable simulatorId)
58      {
59          this(1, simulatorId);
60      }
61  
62      /**
63       * @param moveThreads int; set moveThreads
64       */
65      public final void setMoveThreads(final int moveThreads)
66      {
67          this.moveThreads = moveThreads;
68      }
69  
70      /**
71       * @return moveThreads
72       */
73      public final int getMoveThreads()
74      {
75          return this.moveThreads;
76      }
77  
78      /** {@inheritDoc} */
79      @Override
80      protected final Duration simulatorTimeForWallClockMillis(final double factor)
81      {
82          return new Duration(factor, DurationUnit.MILLISECOND);
83      }
84  
85      /** {@inheritDoc} */
86      @Override
87      public final String toString()
88      {
89          return "DEVSRealTimeClock.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
90      }
91  
92      // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
93      /** {@inheritDoc} */
94      @Override
95      @SuppressWarnings("checkstyle:designforextension")
96      public void run()
97      {
98          AnimationThread animationThread = new AnimationThread(this);
99          animationThread.start();
100 
101         long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
102         SimTimeDoubleUnit simTime0 = this.simulatorTime; // _______ current zero for the sim clock
103         double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
104         double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
105         Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
106                                                                                         // wall clock
107 
108         while (this.isStartingOrRunning() && !this.eventList.isEmpty()
109                 && this.getSimulatorTime().le(this.replication.getTreatment().getEndTime()))
110         {
111             // check if speedFactor has changed. If yes: re-baseline.
112             if (factor != getSpeedFactor())
113             {
114                 clockTime0 = System.currentTimeMillis();
115                 simTime0 = this.simulatorTime;
116                 factor = getSpeedFactor();
117                 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
118             }
119 
120             // check if we are behind; syncTime is the needed current time on the wall-clock
121             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
122             // delta is the time we might be behind
123             double simTime = this.simulatorTime.diff(simTime0).doubleValue();
124 
125             if (syncTime > simTime)
126             {
127                 // we are behind
128                 if (!isCatchup())
129                 {
130                     // if no catch-up: re-baseline.
131                     clockTime0 = System.currentTimeMillis();
132                     simTime0 = this.simulatorTime;
133                 }
134                 else
135                 {
136                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
137                     // first
138                     synchronized (super.semaphore)
139                     {
140                         Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
141                         SimTimeDoubleUnit absSyncTime = this.simulatorTime.plus(delta);
142                         SimTimeDoubleUnit eventTime = this.eventList.first().getAbsoluteExecutionTime();
143                         if (absSyncTime.lt(eventTime))
144                         {
145                             this.simulatorTime = absSyncTime;
146                         }
147                         else
148                         {
149                             this.simulatorTime = eventTime;
150                         }
151                     }
152                 }
153             }
154 
155             // peek at the first event and determine the time difference relative to RT speed; that determines
156             // how long we have to wait.
157             SimEventInterface<SimTimeDoubleUnit> event = this.eventList.first();
158             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
159 
160             /*
161              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
162              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
163              * have to wait 10 times that amount. We might also be behind.
164              */
165             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
166             {
167                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
168                 {
169                     try
170                     {
171                         Thread.sleep(getUpdateMsec());
172 
173                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
174                         // this could cause a jump.
175                         if (factor != getSpeedFactor())
176                         {
177                             simTimeDiffMillis = 0.0;
178                         }
179 
180                     }
181                     catch (InterruptedException ie)
182                     {
183                         // do nothing
184                         ie = null;
185                     }
186 
187                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
188                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
189                     {
190                         event = this.eventList.first();
191                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().diff(simTime0)).doubleValue() / (msec1 * factor);
192                     }
193                     else
194                     {
195                         // make a small time step for the animation during wallclock waiting.
196                         // but never beyond the next event time.
197                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
198                         {
199                             synchronized (super.semaphore)
200                             {
201                                 this.simulatorTime.add(rSim);
202                             }
203                         }
204                     }
205                 }
206             }
207 
208             this.simulatorTime = event.getAbsoluteExecutionTime();
209             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime.get());
210 
211             if (this.moveThreads <= 1)
212             {
213                 synchronized (super.semaphore)
214                 {
215                     // carry out all events scheduled on this simulation time, as long as we are still running.
216                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
217                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
218                     {
219                         event = this.eventList.removeFirst();
220                         try
221                         {
222                             event.execute();
223                         }
224                         catch (Exception exception)
225                         {
226                             getLogger().always().error(exception);
227                             if (this.isPauseOnError())
228                             {
229                                 try
230                                 {
231                                     this.stop();
232                                 }
233                                 catch (SimRuntimeException exception1)
234                                 {
235                                     getLogger().always().error(exception1);
236                                 }
237                             }
238                         }
239                         if (!this.eventList.isEmpty())
240                         {
241                             // peek at next event for while loop.
242                             event = this.eventList.first();
243                         }
244                     }
245                 }
246             }
247 
248             else
249 
250             {
251                 // parallel execution of the move method
252                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
253                 List<SimEventInterface<SimTimeDoubleUnit>> moveEvents = new ArrayList<>();
254                 synchronized (super.semaphore)
255                 {
256                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
257                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
258                     {
259                         event = this.eventList.removeFirst();
260                         SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) event;
261                         if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
262                         {
263                             moveEvents.add(event);
264                         }
265                         else
266                         {
267                             try
268                             {
269                                 event.execute();
270                             }
271                             catch (Exception exception)
272                             {
273                                 getLogger().always().error(exception);
274                                 if (this.isPauseOnError())
275                                 {
276                                     try
277                                     {
278                                         this.stop();
279                                     }
280                                     catch (SimRuntimeException exception1)
281                                     {
282                                         getLogger().always().error(exception1);
283                                     }
284                                 }
285                             }
286                         }
287                         if (!this.eventList.isEmpty())
288                         {
289                             // peek at next event for while loop.
290                             event = this.eventList.first();
291                         }
292                     }
293                 }
294 
295                 // then carry out the move events, based on a constant state at that time.
296                 // first make sure that new events will be stored in a temporary event list...
297                 this.executor = Executors.newFixedThreadPool(1);
298                 for (int i = 0; i < moveEvents.size(); i++)
299                 {
300                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
301                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
302                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
303                     this.executor.execute(new Runnable()
304                     {
305                         @Override
306                         public void run()
307                         {
308                             try
309                             {
310                                 moveEvent.execute();
311                             }
312                             catch (Exception exception)
313                             {
314                                 getLogger().always().error(exception);
315                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
316                                 {
317                                     try
318                                     {
319                                         OTSDEVSRTParallelMove.this.stop();
320                                     }
321                                     catch (SimRuntimeException exception1)
322                                     {
323                                         getLogger().always().error(exception1);
324                                     }
325                                 }
326                             }
327                         }
328                     });
329                 }
330                 this.executor.shutdown();
331                 try
332                 {
333                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
334                 }
335                 catch (InterruptedException exception)
336                 {
337                     //
338                 }
339 
340                 this.executor = Executors.newFixedThreadPool(1);
341                 for (int i = 0; i < moveEvents.size(); i++)
342                 {
343                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
344                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
345                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
346                     this.executor.execute(new Runnable()
347                     {
348                         @Override
349                         public void run()
350                         {
351                             try
352                             {
353                                 moveEvent.execute();
354                             }
355                             catch (Exception exception)
356                             {
357                                 getLogger().always().error(exception);
358                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
359                                 {
360                                     try
361                                     {
362                                         OTSDEVSRTParallelMove.this.stop();
363                                     }
364                                     catch (SimRuntimeException exception1)
365                                     {
366                                         getLogger().always().error(exception1);
367                                     }
368                                 }
369                             }
370                         }
371                     });
372                 }
373                 this.executor.shutdown();
374                 try
375                 {
376                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
377                 }
378                 catch (InterruptedException exception)
379                 {
380                     //
381                 }
382 
383                 this.executor = Executors.newFixedThreadPool(1);
384                 for (int i = 0; i < moveEvents.size(); i++)
385                 {
386                     SimEvent<SimTimeDoubleUnit> se = (SimEvent<SimTimeDoubleUnit>) moveEvents.get(i);
387                     final SimEventInterface<SimTimeDoubleUnit> moveEvent =
388                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
389                     this.executor.execute(new Runnable()
390                     {
391                         @Override
392                         public void run()
393                         {
394                             try
395                             {
396                                 moveEvent.execute();
397                             }
398                             catch (Exception exception)
399                             {
400                                 getLogger().always().error(exception);
401                                 if (OTSDEVSRTParallelMove.this.isPauseOnError())
402                                 {
403                                     try
404                                     {
405                                         OTSDEVSRTParallelMove.this.stop();
406                                     }
407                                     catch (SimRuntimeException exception1)
408                                     {
409                                         getLogger().always().error(exception1);
410                                     }
411                                 }
412                             }
413                         }
414                     });
415                 }
416                 this.executor.shutdown();
417                 try
418                 {
419                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
420                 }
421                 catch (InterruptedException exception)
422                 {
423                     //
424                 }
425 
426             }
427         }
428         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, this.simulatorTime, this.simulatorTime.get());
429 
430         updateAnimation();
431         animationThread.stopAnimation();
432     }
433 
434 }