View Javadoc
1   package org.opentrafficsim.core.dsol;
2   
3   import java.io.Serializable;
4   import java.util.ArrayList;
5   import java.util.List;
6   import java.util.concurrent.ExecutorService;
7   import java.util.concurrent.Executors;
8   
9   import org.djunits.unit.DurationUnit;
10  import org.djunits.value.vdouble.scalar.Duration;
11  import org.opentrafficsim.core.gtu.Gtu;
12  
13  import nl.tudelft.simulation.dsol.SimRuntimeException;
14  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16  import nl.tudelft.simulation.dsol.simulators.DevsRealTimeAnimator;
17  import nl.tudelft.simulation.dsol.simulators.ErrorStrategy;
18  import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
19  
20  /**
21   * <p>
22   * Copyright (c) 2013-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
24   * </p>
25   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
26   */
27  public class OtsDevsRealTimeParallelMove extends DevsRealTimeAnimator<Duration>
28  {
29      /** */
30      private static final long serialVersionUID = 20140909L;
31  
32      /** number of threads to use for move(). */
33      private int moveThreads = 1;
34  
35      /** the thread pool for parallel execution. */
36      private ExecutorService executor = null;
37  
38      /**
39       * Create a new OTSRealTimeClock.
40       * @param moveThreads int; The number of move threads to use
41       * @param simulatorId the id of the simulator to use in remote communication
42       */
43      public OtsDevsRealTimeParallelMove(final int moveThreads, final Serializable simulatorId)
44      {
45          super(simulatorId);
46          setMoveThreads(moveThreads);
47          setEventList(new SynchronizedRedBlackTree<>());
48      }
49  
50      /**
51       * Create a new OTSRealTimeClock.
52       * @param simulatorId the id of the simulator to use in remote communication
53       */
54      public OtsDevsRealTimeParallelMove(final Serializable simulatorId)
55      {
56          this(1, simulatorId);
57      }
58  
59      /**
60       * @param moveThreads int; set moveThreads
61       */
62      public final void setMoveThreads(final int moveThreads)
63      {
64          this.moveThreads = moveThreads;
65      }
66  
67      /**
68       * @return moveThreads
69       */
70      public final int getMoveThreads()
71      {
72          return this.moveThreads;
73      }
74  
75      /** {@inheritDoc} */
76      @Override
77      protected final Duration simulatorTimeForWallClockMillis(final double factor)
78      {
79          return new Duration(factor, DurationUnit.MILLISECOND);
80      }
81  
82      /** {@inheritDoc} */
83      @Override
84      public final String toString()
85      {
86          return "DevsRealTimeAnimator.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
87      }
88  
89      // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
90      /** {@inheritDoc} */
91      @Override
92      @SuppressWarnings("checkstyle:designforextension")
93      public void run()
94      {
95          AnimationThread animationThread = new AnimationThread(this);
96          animationThread.start();
97  
98          long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
99          Duration simTime0 = this.simulatorTime; // _______ current zero for the sim clock
100         double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
101         double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
102         Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
103                                                                                         // wall clock
104 
105         while (this.isStartingOrRunning() && !this.eventList.isEmpty()
106                 && this.getSimulatorTime().le(this.replication.getEndTime()))
107         {
108             // check if speedFactor has changed. If yes: re-baseline.
109             if (factor != getSpeedFactor())
110             {
111                 clockTime0 = System.currentTimeMillis();
112                 simTime0 = this.simulatorTime;
113                 factor = getSpeedFactor();
114                 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
115             }
116 
117             // check if we are behind; syncTime is the needed current time on the wall-clock
118             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
119             // delta is the time we might be behind
120             double simTime = this.simulatorTime.minus(simTime0).doubleValue();
121 
122             if (syncTime > simTime)
123             {
124                 // we are behind
125                 if (!isCatchup())
126                 {
127                     // if no catch-up: re-baseline.
128                     clockTime0 = System.currentTimeMillis();
129                     simTime0 = this.simulatorTime;
130                 }
131                 else
132                 {
133                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
134                     // first
135                     synchronized (super.semaphore)
136                     {
137                         Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
138                         Duration absSyncTime = this.simulatorTime.plus(delta);
139                         Duration eventTime = this.eventList.first().getAbsoluteExecutionTime();
140                         if (absSyncTime.lt(eventTime))
141                         {
142                             this.simulatorTime = absSyncTime;
143                         }
144                         else
145                         {
146                             this.simulatorTime = eventTime;
147                         }
148                     }
149                 }
150             }
151 
152             // peek at the first event and determine the time difference relative to RT speed; that determines
153             // how long we have to wait.
154             SimEventInterface<Duration> event = this.eventList.first();
155             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
156 
157             /*
158              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
159              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
160              * have to wait 10 times that amount. We might also be behind.
161              */
162             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
163             {
164                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
165                 {
166                     try
167                     {
168                         Thread.sleep(getUpdateMsec());
169 
170                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
171                         // this could cause a jump.
172                         if (factor != getSpeedFactor())
173                         {
174                             simTimeDiffMillis = 0.0;
175                         }
176 
177                     }
178                     catch (InterruptedException ie)
179                     {
180                         // do nothing
181                         ie = null;
182                     }
183 
184                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
185                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
186                     {
187                         event = this.eventList.first();
188                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
189                     }
190                     else
191                     {
192                         // make a small time step for the animation during wallclock waiting.
193                         // but never beyond the next event time.
194                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
195                         {
196                             synchronized (super.semaphore)
197                             {
198                                 this.simulatorTime = this.simulatorTime.plus(rSim);
199                             }
200                         }
201                     }
202                 }
203             }
204 
205             this.simulatorTime = event.getAbsoluteExecutionTime();
206             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime);
207 
208             if (this.moveThreads <= 1)
209             {
210                 synchronized (super.semaphore)
211                 {
212                     // carry out all events scheduled on this simulation time, as long as we are still running.
213                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
214                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
215                     {
216                         event = this.eventList.removeFirst();
217                         try
218                         {
219                             event.execute();
220                         }
221                         catch (Exception exception)
222                         {
223                             getLogger().always().error(exception);
224                             if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
225                             {
226                                 try
227                                 {
228                                     this.stop();
229                                 }
230                                 catch (SimRuntimeException exception1)
231                                 {
232                                     getLogger().always().error(exception1);
233                                 }
234                             }
235                         }
236                         if (!this.eventList.isEmpty())
237                         {
238                             // peek at next event for while loop.
239                             event = this.eventList.first();
240                         }
241                     }
242                 }
243             }
244 
245             else
246 
247             {
248                 // parallel execution of the move method
249                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
250                 List<SimEventInterface<Duration>> moveEvents = new ArrayList<>();
251                 synchronized (super.semaphore)
252                 {
253                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
254                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
255                     {
256                         event = this.eventList.removeFirst();
257                         SimEvent<Duration> se = (SimEvent<Duration>) event;
258                         if (se.getTarget() instanceof Gtu && se.getMethod().equals("move"))
259                         {
260                             moveEvents.add(event);
261                         }
262                         else
263                         {
264                             try
265                             {
266                                 event.execute();
267                             }
268                             catch (Exception exception)
269                             {
270                                 getLogger().always().error(exception);
271                                 if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
272                                 {
273                                     try
274                                     {
275                                         this.stop();
276                                     }
277                                     catch (SimRuntimeException exception1)
278                                     {
279                                         getLogger().always().error(exception1);
280                                     }
281                                 }
282                             }
283                         }
284                         if (!this.eventList.isEmpty())
285                         {
286                             // peek at next event for while loop.
287                             event = this.eventList.first();
288                         }
289                     }
290                 }
291 
292                 // then carry out the move events, based on a constant state at that time.
293                 // first make sure that new events will be stored in a temporary event list...
294                 this.executor = Executors.newFixedThreadPool(1);
295                 for (int i = 0; i < moveEvents.size(); i++)
296                 {
297                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
298                     final SimEventInterface<Duration> moveEvent =
299                             new SimEvent<>(this.simulatorTime, se.getTarget(), "movePrep", se.getArgs());
300                     this.executor.execute(new Runnable()
301                     {
302                         @Override
303                         public void run()
304                         {
305                             try
306                             {
307                                 moveEvent.execute();
308                             }
309                             catch (Exception exception)
310                             {
311                                 getLogger().always().error(exception);
312                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
313                                 {
314                                     try
315                                     {
316                                         OtsDevsRealTimeParallelMove.this.stop();
317                                     }
318                                     catch (SimRuntimeException exception1)
319                                     {
320                                         getLogger().always().error(exception1);
321                                     }
322                                 }
323                             }
324                         }
325                     });
326                 }
327                 this.executor.shutdown();
328                 try
329                 {
330                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
331                 }
332                 catch (InterruptedException exception)
333                 {
334                     //
335                 }
336 
337                 this.executor = Executors.newFixedThreadPool(1);
338                 for (int i = 0; i < moveEvents.size(); i++)
339                 {
340                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
341                     final SimEventInterface<Duration> moveEvent =
342                             new SimEvent<>(this.simulatorTime, se.getTarget(), "moveGenerate", se.getArgs());
343                     this.executor.execute(new Runnable()
344                     {
345                         @Override
346                         public void run()
347                         {
348                             try
349                             {
350                                 moveEvent.execute();
351                             }
352                             catch (Exception exception)
353                             {
354                                 getLogger().always().error(exception);
355                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
356                                 {
357                                     try
358                                     {
359                                         OtsDevsRealTimeParallelMove.this.stop();
360                                     }
361                                     catch (SimRuntimeException exception1)
362                                     {
363                                         getLogger().always().error(exception1);
364                                     }
365                                 }
366                             }
367                         }
368                     });
369                 }
370                 this.executor.shutdown();
371                 try
372                 {
373                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
374                 }
375                 catch (InterruptedException exception)
376                 {
377                     //
378                 }
379 
380                 this.executor = Executors.newFixedThreadPool(1);
381                 for (int i = 0; i < moveEvents.size(); i++)
382                 {
383                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
384                     final SimEventInterface<Duration> moveEvent =
385                             new SimEvent<>(this.simulatorTime, se.getTarget(), "moveFinish", se.getArgs());
386                     this.executor.execute(new Runnable()
387                     {
388                         @Override
389                         public void run()
390                         {
391                             try
392                             {
393                                 moveEvent.execute();
394                             }
395                             catch (Exception exception)
396                             {
397                                 getLogger().always().error(exception);
398                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
399                                 {
400                                     try
401                                     {
402                                         OtsDevsRealTimeParallelMove.this.stop();
403                                     }
404                                     catch (SimRuntimeException exception1)
405                                     {
406                                         getLogger().always().error(exception1);
407                                     }
408                                 }
409                             }
410                         }
411                     });
412                 }
413                 this.executor.shutdown();
414                 try
415                 {
416                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
417                 }
418                 catch (InterruptedException exception)
419                 {
420                     //
421                 }
422 
423             }
424         }
425         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, /* this.simulatorTime, */ this.simulatorTime);
426 
427         updateAnimation();
428         animationThread.stopAnimation();
429     }
430 
431 }