View Javadoc
1   package org.opentrafficsim.core.dsol;
2   
3   import java.io.Serializable;
4   import java.util.ArrayList;
5   import java.util.List;
6   import java.util.concurrent.ExecutorService;
7   import java.util.concurrent.Executors;
8   
9   import org.djunits.unit.DurationUnit;
10  import org.djunits.value.vdouble.scalar.Duration;
11  import org.opentrafficsim.core.gtu.Gtu;
12  
13  import nl.tudelft.simulation.dsol.SimRuntimeException;
14  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16  import nl.tudelft.simulation.dsol.simulators.DevsRealTimeAnimator;
17  import nl.tudelft.simulation.dsol.simulators.ErrorStrategy;
18  import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
19  
20  /**
21   * <p>
22   * Copyright (c) 2013-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
24   * </p>
25   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
26   */
27  public class OtsDevsRealTimeParallelMove extends DevsRealTimeAnimator<Duration>
28  {
29      /** */
30      private static final long serialVersionUID = 20140909L;
31  
32      /** number of threads to use for move(). */
33      private int moveThreads = 1;
34  
35      /** the thread pool for parallel execution. */
36      private ExecutorService executor = null;
37  
38      /**
39       * Create a new OTSRealTimeClock.
40       * @param moveThreads The number of move threads to use
41       * @param simulatorId the id of the simulator to use in remote communication
42       */
43      public OtsDevsRealTimeParallelMove(final int moveThreads, final Serializable simulatorId)
44      {
45          super(simulatorId);
46          setMoveThreads(moveThreads);
47          setEventList(new SynchronizedRedBlackTree<>());
48      }
49  
50      /**
51       * Create a new OTSRealTimeClock.
52       * @param simulatorId the id of the simulator to use in remote communication
53       */
54      public OtsDevsRealTimeParallelMove(final Serializable simulatorId)
55      {
56          this(1, simulatorId);
57      }
58  
59      /**
60       * @param moveThreads set moveThreads
61       */
62      public final void setMoveThreads(final int moveThreads)
63      {
64          this.moveThreads = moveThreads;
65      }
66  
67      /**
68       * @return moveThreads
69       */
70      public final int getMoveThreads()
71      {
72          return this.moveThreads;
73      }
74  
75      @Override
76      protected final Duration simulatorTimeForWallClockMillis(final double factor)
77      {
78          return new Duration(factor, DurationUnit.MILLISECOND);
79      }
80  
81      @Override
82      public final String toString()
83      {
84          return "DevsRealTimeAnimator.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
85      }
86  
87      // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
88      @Override
89      @SuppressWarnings("checkstyle:designforextension")
90      public void run()
91      {
92          AnimationThread animationThread = new AnimationThread(this);
93          animationThread.start();
94  
95          long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
96          Duration simTime0 = this.simulatorTime; // _______ current zero for the sim clock
97          double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
98          double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
99          Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
100                                                                                         // wall clock
101 
102         while (this.isStartingOrRunning() && !this.eventList.isEmpty()
103                 && this.getSimulatorTime().le(this.replication.getEndTime()))
104         {
105             // check if speedFactor has changed. If yes: re-baseline.
106             if (factor != getSpeedFactor())
107             {
108                 clockTime0 = System.currentTimeMillis();
109                 simTime0 = this.simulatorTime;
110                 factor = getSpeedFactor();
111                 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
112             }
113 
114             // check if we are behind; syncTime is the needed current time on the wall-clock
115             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
116             // delta is the time we might be behind
117             double simTime = this.simulatorTime.minus(simTime0).doubleValue();
118 
119             if (syncTime > simTime)
120             {
121                 // we are behind
122                 if (!isCatchup())
123                 {
124                     // if no catch-up: re-baseline.
125                     clockTime0 = System.currentTimeMillis();
126                     simTime0 = this.simulatorTime;
127                 }
128                 else
129                 {
130                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
131                     // first
132                     synchronized (super.semaphore)
133                     {
134                         Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
135                         Duration absSyncTime = this.simulatorTime.plus(delta);
136                         Duration eventTime = this.eventList.first().getAbsoluteExecutionTime();
137                         if (absSyncTime.lt(eventTime))
138                         {
139                             this.simulatorTime = absSyncTime;
140                         }
141                         else
142                         {
143                             this.simulatorTime = eventTime;
144                         }
145                     }
146                 }
147             }
148 
149             // peek at the first event and determine the time difference relative to RT speed; that determines
150             // how long we have to wait.
151             SimEventInterface<Duration> event = this.eventList.first();
152             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
153 
154             /*
155              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
156              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
157              * have to wait 10 times that amount. We might also be behind.
158              */
159             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
160             {
161                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
162                 {
163                     try
164                     {
165                         Thread.sleep(getUpdateMsec());
166 
167                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
168                         // this could cause a jump.
169                         if (factor != getSpeedFactor())
170                         {
171                             simTimeDiffMillis = 0.0;
172                         }
173 
174                     }
175                     catch (InterruptedException ie)
176                     {
177                         // do nothing
178                         ie = null;
179                     }
180 
181                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
182                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
183                     {
184                         event = this.eventList.first();
185                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
186                     }
187                     else
188                     {
189                         // make a small time step for the animation during wallclock waiting.
190                         // but never beyond the next event time.
191                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
192                         {
193                             synchronized (super.semaphore)
194                             {
195                                 this.simulatorTime = this.simulatorTime.plus(rSim);
196                             }
197                         }
198                     }
199                 }
200             }
201 
202             this.simulatorTime = event.getAbsoluteExecutionTime();
203             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime);
204 
205             if (this.moveThreads <= 1)
206             {
207                 synchronized (super.semaphore)
208                 {
209                     // carry out all events scheduled on this simulation time, as long as we are still running.
210                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
211                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
212                     {
213                         event = this.eventList.removeFirst();
214                         try
215                         {
216                             event.execute();
217                         }
218                         catch (Exception exception)
219                         {
220                             getLogger().always().error(exception);
221                             if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
222                             {
223                                 try
224                                 {
225                                     this.stop();
226                                 }
227                                 catch (SimRuntimeException exception1)
228                                 {
229                                     getLogger().always().error(exception1);
230                                 }
231                             }
232                         }
233                         if (!this.eventList.isEmpty())
234                         {
235                             // peek at next event for while loop.
236                             event = this.eventList.first();
237                         }
238                     }
239                 }
240             }
241 
242             else
243 
244             {
245                 // parallel execution of the move method
246                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
247                 List<SimEventInterface<Duration>> moveEvents = new ArrayList<>();
248                 synchronized (super.semaphore)
249                 {
250                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
251                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
252                     {
253                         event = this.eventList.removeFirst();
254                         SimEvent<Duration> se = (SimEvent<Duration>) event;
255                         if (se.getTarget() instanceof Gtu && se.getMethod().equals("move"))
256                         {
257                             moveEvents.add(event);
258                         }
259                         else
260                         {
261                             try
262                             {
263                                 event.execute();
264                             }
265                             catch (Exception exception)
266                             {
267                                 getLogger().always().error(exception);
268                                 if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
269                                 {
270                                     try
271                                     {
272                                         this.stop();
273                                     }
274                                     catch (SimRuntimeException exception1)
275                                     {
276                                         getLogger().always().error(exception1);
277                                     }
278                                 }
279                             }
280                         }
281                         if (!this.eventList.isEmpty())
282                         {
283                             // peek at next event for while loop.
284                             event = this.eventList.first();
285                         }
286                     }
287                 }
288 
289                 // then carry out the move events, based on a constant state at that time.
290                 // first make sure that new events will be stored in a temporary event list...
291                 this.executor = Executors.newFixedThreadPool(1);
292                 for (int i = 0; i < moveEvents.size(); i++)
293                 {
294                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
295                     final SimEventInterface<Duration> moveEvent =
296                             new SimEvent<>(this.simulatorTime, se.getTarget(), "movePrep", se.getArgs());
297                     this.executor.execute(new Runnable()
298                     {
299                         @Override
300                         public void run()
301                         {
302                             try
303                             {
304                                 moveEvent.execute();
305                             }
306                             catch (Exception exception)
307                             {
308                                 getLogger().always().error(exception);
309                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
310                                 {
311                                     try
312                                     {
313                                         OtsDevsRealTimeParallelMove.this.stop();
314                                     }
315                                     catch (SimRuntimeException exception1)
316                                     {
317                                         getLogger().always().error(exception1);
318                                     }
319                                 }
320                             }
321                         }
322                     });
323                 }
324                 this.executor.shutdown();
325                 try
326                 {
327                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
328                 }
329                 catch (InterruptedException exception)
330                 {
331                     //
332                 }
333 
334                 this.executor = Executors.newFixedThreadPool(1);
335                 for (int i = 0; i < moveEvents.size(); i++)
336                 {
337                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
338                     final SimEventInterface<Duration> moveEvent =
339                             new SimEvent<>(this.simulatorTime, se.getTarget(), "moveGenerate", se.getArgs());
340                     this.executor.execute(new Runnable()
341                     {
342                         @Override
343                         public void run()
344                         {
345                             try
346                             {
347                                 moveEvent.execute();
348                             }
349                             catch (Exception exception)
350                             {
351                                 getLogger().always().error(exception);
352                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
353                                 {
354                                     try
355                                     {
356                                         OtsDevsRealTimeParallelMove.this.stop();
357                                     }
358                                     catch (SimRuntimeException exception1)
359                                     {
360                                         getLogger().always().error(exception1);
361                                     }
362                                 }
363                             }
364                         }
365                     });
366                 }
367                 this.executor.shutdown();
368                 try
369                 {
370                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
371                 }
372                 catch (InterruptedException exception)
373                 {
374                     //
375                 }
376 
377                 this.executor = Executors.newFixedThreadPool(1);
378                 for (int i = 0; i < moveEvents.size(); i++)
379                 {
380                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
381                     final SimEventInterface<Duration> moveEvent =
382                             new SimEvent<>(this.simulatorTime, se.getTarget(), "moveFinish", se.getArgs());
383                     this.executor.execute(new Runnable()
384                     {
385                         @Override
386                         public void run()
387                         {
388                             try
389                             {
390                                 moveEvent.execute();
391                             }
392                             catch (Exception exception)
393                             {
394                                 getLogger().always().error(exception);
395                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
396                                 {
397                                     try
398                                     {
399                                         OtsDevsRealTimeParallelMove.this.stop();
400                                     }
401                                     catch (SimRuntimeException exception1)
402                                     {
403                                         getLogger().always().error(exception1);
404                                     }
405                                 }
406                             }
407                         }
408                     });
409                 }
410                 this.executor.shutdown();
411                 try
412                 {
413                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
414                 }
415                 catch (InterruptedException exception)
416                 {
417                     //
418                 }
419 
420             }
421         }
422         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, /* this.simulatorTime, */ this.simulatorTime);
423 
424         updateAnimation();
425         animationThread.stopAnimation();
426     }
427 
428 }