View Javadoc
1   package org.opentrafficsim.core.dsol;
2   
3   import java.io.Serializable;
4   import java.util.ArrayList;
5   import java.util.List;
6   import java.util.concurrent.ExecutorService;
7   import java.util.concurrent.Executors;
8   
9   import org.djunits.unit.DurationUnit;
10  import org.djunits.value.vdouble.scalar.Duration;
11  import org.opentrafficsim.core.gtu.GTU;
12  
13  import nl.tudelft.simulation.dsol.SimRuntimeException;
14  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
15  import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
16  import nl.tudelft.simulation.dsol.simulators.DEVSRealTimeAnimator;
17  import nl.tudelft.simulation.dsol.simulators.ErrorStrategy;
18  import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
19  
20  /**
21   * <p>
22   * Copyright (c) 2013-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
24   * <p>
25   * @version $Revision: 2386 $, $LastChangedDate: 2016-10-16 14:55:54 +0200 (Sun, 16 Oct 2016) $, by $Author: averbraeck $,
26   *          initial version Aug 15, 2014 <br>
27   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
28   */
29  public class OTSDEVSRTParallelMove extends DEVSRealTimeAnimator<Duration>
30  {
31      /** */
32      private static final long serialVersionUID = 20140909L;
33  
34      /** number of threads to use for move(). */
35      private int moveThreads = 1;
36  
37      /** the thread pool for parallel execution. */
38      private ExecutorService executor = null;
39  
40      /**
41       * Create a new OTSRealTimeClock.
42       * @param moveThreads int; The number of move threads to use
43       * @param simulatorId the id of the simulator to use in remote communication
44       */
45      public OTSDEVSRTParallelMove(final int moveThreads, final Serializable simulatorId)
46      {
47          super(simulatorId);
48          setMoveThreads(moveThreads);
49          setEventList(new SynchronizedRedBlackTree<>());
50      }
51  
52      /**
53       * Create a new OTSRealTimeClock.
54       * @param simulatorId the id of the simulator to use in remote communication
55       */
56      public OTSDEVSRTParallelMove(final Serializable simulatorId)
57      {
58          this(1, simulatorId);
59      }
60  
61      /**
62       * @param moveThreads int; set moveThreads
63       */
64      public final void setMoveThreads(final int moveThreads)
65      {
66          this.moveThreads = moveThreads;
67      }
68  
69      /**
70       * @return moveThreads
71       */
72      public final int getMoveThreads()
73      {
74          return this.moveThreads;
75      }
76  
77      /** {@inheritDoc} */
78      @Override
79      protected final Duration simulatorTimeForWallClockMillis(final double factor)
80      {
81          return new Duration(factor, DurationUnit.MILLISECOND);
82      }
83  
84      /** {@inheritDoc} */
85      @Override
86      public final String toString()
87      {
88          return "DEVSRealTimeAnimator.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
89      }
90  
91      // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
92      /** {@inheritDoc} */
93      @Override
94      @SuppressWarnings("checkstyle:designforextension")
95      public void run()
96      {
97          AnimationThread animationThread = new AnimationThread(this);
98          animationThread.start();
99  
100         long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
101         Duration simTime0 = this.simulatorTime; // _______ current zero for the sim clock
102         double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
103         double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
104         Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
105                                                                                         // wall clock
106 
107         while (this.isStartingOrRunning() && !this.eventList.isEmpty()
108                 && this.getSimulatorTime().le(this.replication.getEndTime()))
109         {
110             // check if speedFactor has changed. If yes: re-baseline.
111             if (factor != getSpeedFactor())
112             {
113                 clockTime0 = System.currentTimeMillis();
114                 simTime0 = this.simulatorTime;
115                 factor = getSpeedFactor();
116                 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
117             }
118 
119             // check if we are behind; syncTime is the needed current time on the wall-clock
120             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
121             // delta is the time we might be behind
122             double simTime = this.simulatorTime.minus(simTime0).doubleValue();
123 
124             if (syncTime > simTime)
125             {
126                 // we are behind
127                 if (!isCatchup())
128                 {
129                     // if no catch-up: re-baseline.
130                     clockTime0 = System.currentTimeMillis();
131                     simTime0 = this.simulatorTime;
132                 }
133                 else
134                 {
135                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
136                     // first
137                     synchronized (super.semaphore)
138                     {
139                         Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
140                         Duration absSyncTime = this.simulatorTime.plus(delta);
141                         Duration eventTime = this.eventList.first().getAbsoluteExecutionTime();
142                         if (absSyncTime.lt(eventTime))
143                         {
144                             this.simulatorTime = absSyncTime;
145                         }
146                         else
147                         {
148                             this.simulatorTime = eventTime;
149                         }
150                     }
151                 }
152             }
153 
154             // peek at the first event and determine the time difference relative to RT speed; that determines
155             // how long we have to wait.
156             SimEventInterface<Duration> event = this.eventList.first();
157             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
158 
159             /*
160              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
161              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
162              * have to wait 10 times that amount. We might also be behind.
163              */
164             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
165             {
166                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
167                 {
168                     try
169                     {
170                         Thread.sleep(getUpdateMsec());
171 
172                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
173                         // this could cause a jump.
174                         if (factor != getSpeedFactor())
175                         {
176                             simTimeDiffMillis = 0.0;
177                         }
178 
179                     }
180                     catch (InterruptedException ie)
181                     {
182                         // do nothing
183                         ie = null;
184                     }
185 
186                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
187                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
188                     {
189                         event = this.eventList.first();
190                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
191                     }
192                     else
193                     {
194                         // make a small time step for the animation during wallclock waiting.
195                         // but never beyond the next event time.
196                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
197                         {
198                             synchronized (super.semaphore)
199                             {
200                                 this.simulatorTime = this.simulatorTime.plus(rSim);
201                             }
202                         }
203                     }
204                 }
205             }
206 
207             this.simulatorTime = event.getAbsoluteExecutionTime();
208             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime);
209 
210             if (this.moveThreads <= 1)
211             {
212                 synchronized (super.semaphore)
213                 {
214                     // carry out all events scheduled on this simulation time, as long as we are still running.
215                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
216                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
217                     {
218                         event = this.eventList.removeFirst();
219                         try
220                         {
221                             event.execute();
222                         }
223                         catch (Exception exception)
224                         {
225                             getLogger().always().error(exception);
226                             if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
227                             {
228                                 try
229                                 {
230                                     this.stop();
231                                 }
232                                 catch (SimRuntimeException exception1)
233                                 {
234                                     getLogger().always().error(exception1);
235                                 }
236                             }
237                         }
238                         if (!this.eventList.isEmpty())
239                         {
240                             // peek at next event for while loop.
241                             event = this.eventList.first();
242                         }
243                     }
244                 }
245             }
246 
247             else
248 
249             {
250                 // parallel execution of the move method
251                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
252                 List<SimEventInterface<Duration>> moveEvents = new ArrayList<>();
253                 synchronized (super.semaphore)
254                 {
255                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
256                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
257                     {
258                         event = this.eventList.removeFirst();
259                         SimEvent<Duration> se = (SimEvent<Duration>) event;
260                         if (se.getTarget() instanceof GTU && se.getMethod().equals("move"))
261                         {
262                             moveEvents.add(event);
263                         }
264                         else
265                         {
266                             try
267                             {
268                                 event.execute();
269                             }
270                             catch (Exception exception)
271                             {
272                                 getLogger().always().error(exception);
273                                 if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
274                                 {
275                                     try
276                                     {
277                                         this.stop();
278                                     }
279                                     catch (SimRuntimeException exception1)
280                                     {
281                                         getLogger().always().error(exception1);
282                                     }
283                                 }
284                             }
285                         }
286                         if (!this.eventList.isEmpty())
287                         {
288                             // peek at next event for while loop.
289                             event = this.eventList.first();
290                         }
291                     }
292                 }
293 
294                 // then carry out the move events, based on a constant state at that time.
295                 // first make sure that new events will be stored in a temporary event list...
296                 this.executor = Executors.newFixedThreadPool(1);
297                 for (int i = 0; i < moveEvents.size(); i++)
298                 {
299                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
300                     final SimEventInterface<Duration> moveEvent =
301                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "movePrep", se.getArgs());
302                     this.executor.execute(new Runnable()
303                     {
304                         @Override
305                         public void run()
306                         {
307                             try
308                             {
309                                 moveEvent.execute();
310                             }
311                             catch (Exception exception)
312                             {
313                                 getLogger().always().error(exception);
314                                 if (OTSDEVSRTParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
315                                 {
316                                     try
317                                     {
318                                         OTSDEVSRTParallelMove.this.stop();
319                                     }
320                                     catch (SimRuntimeException exception1)
321                                     {
322                                         getLogger().always().error(exception1);
323                                     }
324                                 }
325                             }
326                         }
327                     });
328                 }
329                 this.executor.shutdown();
330                 try
331                 {
332                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
333                 }
334                 catch (InterruptedException exception)
335                 {
336                     //
337                 }
338 
339                 this.executor = Executors.newFixedThreadPool(1);
340                 for (int i = 0; i < moveEvents.size(); i++)
341                 {
342                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
343                     final SimEventInterface<Duration> moveEvent =
344                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveGenerate", se.getArgs());
345                     this.executor.execute(new Runnable()
346                     {
347                         @Override
348                         public void run()
349                         {
350                             try
351                             {
352                                 moveEvent.execute();
353                             }
354                             catch (Exception exception)
355                             {
356                                 getLogger().always().error(exception);
357                                 if (OTSDEVSRTParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
358                                 {
359                                     try
360                                     {
361                                         OTSDEVSRTParallelMove.this.stop();
362                                     }
363                                     catch (SimRuntimeException exception1)
364                                     {
365                                         getLogger().always().error(exception1);
366                                     }
367                                 }
368                             }
369                         }
370                     });
371                 }
372                 this.executor.shutdown();
373                 try
374                 {
375                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
376                 }
377                 catch (InterruptedException exception)
378                 {
379                     //
380                 }
381 
382                 this.executor = Executors.newFixedThreadPool(1);
383                 for (int i = 0; i < moveEvents.size(); i++)
384                 {
385                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
386                     final SimEventInterface<Duration> moveEvent =
387                             new SimEvent<>(this.simulatorTime, se.getSource(), se.getTarget(), "moveFinish", se.getArgs());
388                     this.executor.execute(new Runnable()
389                     {
390                         @Override
391                         public void run()
392                         {
393                             try
394                             {
395                                 moveEvent.execute();
396                             }
397                             catch (Exception exception)
398                             {
399                                 getLogger().always().error(exception);
400                                 if (OTSDEVSRTParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
401                                 {
402                                     try
403                                     {
404                                         OTSDEVSRTParallelMove.this.stop();
405                                     }
406                                     catch (SimRuntimeException exception1)
407                                     {
408                                         getLogger().always().error(exception1);
409                                     }
410                                 }
411                             }
412                         }
413                     });
414                 }
415                 this.executor.shutdown();
416                 try
417                 {
418                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
419                 }
420                 catch (InterruptedException exception)
421                 {
422                     //
423                 }
424 
425             }
426         }
427         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, /* this.simulatorTime, */ this.simulatorTime);
428 
429         updateAnimation();
430         animationThread.stopAnimation();
431     }
432 
433 }