OtsDevsRealTimeParallelMove.java

  1. package org.opentrafficsim.core.dsol;

  2. import java.io.Serializable;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;

  7. import org.djunits.unit.DurationUnit;
  8. import org.djunits.value.vdouble.scalar.Duration;
  9. import org.opentrafficsim.core.gtu.Gtu;

  10. import nl.tudelft.simulation.dsol.SimRuntimeException;
  11. import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
  12. import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
  13. import nl.tudelft.simulation.dsol.simulators.DevsRealTimeAnimator;
  14. import nl.tudelft.simulation.dsol.simulators.ErrorStrategy;
  15. import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;

  16. /**
  17.  * <p>
  18.  * Copyright (c) 2013-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
  19.  * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
  20.  * </p>
  21.  * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
  22.  */
  23. public class OtsDevsRealTimeParallelMove extends DevsRealTimeAnimator<Duration>
  24. {
  25.     /** */
  26.     private static final long serialVersionUID = 20140909L;

  27.     /** number of threads to use for move(). */
  28.     private int moveThreads = 1;

  29.     /** the thread pool for parallel execution. */
  30.     private ExecutorService executor = null;

  31.     /**
  32.      * Create a new OTSRealTimeClock.
  33.      * @param moveThreads int; The number of move threads to use
  34.      * @param simulatorId the id of the simulator to use in remote communication
  35.      */
  36.     public OtsDevsRealTimeParallelMove(final int moveThreads, final Serializable simulatorId)
  37.     {
  38.         super(simulatorId);
  39.         setMoveThreads(moveThreads);
  40.         setEventList(new SynchronizedRedBlackTree<>());
  41.     }

  42.     /**
  43.      * Create a new OTSRealTimeClock.
  44.      * @param simulatorId the id of the simulator to use in remote communication
  45.      */
  46.     public OtsDevsRealTimeParallelMove(final Serializable simulatorId)
  47.     {
  48.         this(1, simulatorId);
  49.     }

  50.     /**
  51.      * @param moveThreads int; set moveThreads
  52.      */
  53.     public final void setMoveThreads(final int moveThreads)
  54.     {
  55.         this.moveThreads = moveThreads;
  56.     }

  57.     /**
  58.      * @return moveThreads
  59.      */
  60.     public final int getMoveThreads()
  61.     {
  62.         return this.moveThreads;
  63.     }

  64.     /** {@inheritDoc} */
  65.     @Override
  66.     protected final Duration simulatorTimeForWallClockMillis(final double factor)
  67.     {
  68.         return new Duration(factor, DurationUnit.MILLISECOND);
  69.     }

  70.     /** {@inheritDoc} */
  71.     @Override
  72.     public final String toString()
  73.     {
  74.         return "DevsRealTimeAnimator.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
  75.     }

  76.     // TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
  77.     /** {@inheritDoc} */
  78.     @Override
  79.     @SuppressWarnings("checkstyle:designforextension")
  80.     public void run()
  81.     {
  82.         AnimationThread animationThread = new AnimationThread(this);
  83.         animationThread.start();

  84.         long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
  85.         Duration simTime0 = this.simulatorTime; // _______ current zero for the sim clock
  86.         double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
  87.         double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
  88.         Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
  89.                                                                                         // wall clock

  90.         while (this.isStartingOrRunning() && !this.eventList.isEmpty()
  91.                 && this.getSimulatorTime().le(this.replication.getEndTime()))
  92.         {
  93.             // check if speedFactor has changed. If yes: re-baseline.
  94.             if (factor != getSpeedFactor())
  95.             {
  96.                 clockTime0 = System.currentTimeMillis();
  97.                 simTime0 = this.simulatorTime;
  98.                 factor = getSpeedFactor();
  99.                 rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
  100.             }

  101.             // check if we are behind; syncTime is the needed current time on the wall-clock
  102.             double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
  103.             // delta is the time we might be behind
  104.             double simTime = this.simulatorTime.minus(simTime0).doubleValue();

  105.             if (syncTime > simTime)
  106.             {
  107.                 // we are behind
  108.                 if (!isCatchup())
  109.                 {
  110.                     // if no catch-up: re-baseline.
  111.                     clockTime0 = System.currentTimeMillis();
  112.                     simTime0 = this.simulatorTime;
  113.                 }
  114.                 else
  115.                 {
  116.                     // jump to the required wall-clock related time or to the time of the next event, whichever comes
  117.                     // first
  118.                     synchronized (super.semaphore)
  119.                     {
  120.                         Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
  121.                         Duration absSyncTime = this.simulatorTime.plus(delta);
  122.                         Duration eventTime = this.eventList.first().getAbsoluteExecutionTime();
  123.                         if (absSyncTime.lt(eventTime))
  124.                         {
  125.                             this.simulatorTime = absSyncTime;
  126.                         }
  127.                         else
  128.                         {
  129.                             this.simulatorTime = eventTime;
  130.                         }
  131.                     }
  132.                 }
  133.             }

  134.             // peek at the first event and determine the time difference relative to RT speed; that determines
  135.             // how long we have to wait.
  136.             SimEventInterface<Duration> event = this.eventList.first();
  137.             double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);

  138.             /*
  139.              * simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
  140.              * number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
  141.              * have to wait 10 times that amount. We might also be behind.
  142.              */
  143.             if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
  144.             {
  145.                 while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
  146.                 {
  147.                     try
  148.                     {
  149.                         Thread.sleep(getUpdateMsec());

  150.                         // check if speedFactor has changed. If yes: break out of this loop and execute event.
  151.                         // this could cause a jump.
  152.                         if (factor != getSpeedFactor())
  153.                         {
  154.                             simTimeDiffMillis = 0.0;
  155.                         }

  156.                     }
  157.                     catch (InterruptedException ie)
  158.                     {
  159.                         // do nothing
  160.                         ie = null;
  161.                     }

  162.                     // check if an event has been inserted. In a real-time situation this can be dome by other threads
  163.                     if (!event.equals(this.eventList.first())) // event inserted by a thread...
  164.                     {
  165.                         event = this.eventList.first();
  166.                         simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
  167.                     }
  168.                     else
  169.                     {
  170.                         // make a small time step for the animation during wallclock waiting.
  171.                         // but never beyond the next event time.
  172.                         if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
  173.                         {
  174.                             synchronized (super.semaphore)
  175.                             {
  176.                                 this.simulatorTime = this.simulatorTime.plus(rSim);
  177.                             }
  178.                         }
  179.                     }
  180.                 }
  181.             }

  182.             this.simulatorTime = event.getAbsoluteExecutionTime();
  183.             this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime);

  184.             if (this.moveThreads <= 1)
  185.             {
  186.                 synchronized (super.semaphore)
  187.                 {
  188.                     // carry out all events scheduled on this simulation time, as long as we are still running.
  189.                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
  190.                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
  191.                     {
  192.                         event = this.eventList.removeFirst();
  193.                         try
  194.                         {
  195.                             event.execute();
  196.                         }
  197.                         catch (Exception exception)
  198.                         {
  199.                             getLogger().always().error(exception);
  200.                             if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
  201.                             {
  202.                                 try
  203.                                 {
  204.                                     this.stop();
  205.                                 }
  206.                                 catch (SimRuntimeException exception1)
  207.                                 {
  208.                                     getLogger().always().error(exception1);
  209.                                 }
  210.                             }
  211.                         }
  212.                         if (!this.eventList.isEmpty())
  213.                         {
  214.                             // peek at next event for while loop.
  215.                             event = this.eventList.first();
  216.                         }
  217.                     }
  218.                 }
  219.             }

  220.             else

  221.             {
  222.                 // parallel execution of the move method
  223.                 // first carry out all the non-move events and make a list of move events to be carried out in parallel
  224.                 List<SimEventInterface<Duration>> moveEvents = new ArrayList<>();
  225.                 synchronized (super.semaphore)
  226.                 {
  227.                     while (this.isStartingOrRunning() && !this.eventList.isEmpty()
  228.                             && event.getAbsoluteExecutionTime().eq(this.simulatorTime))
  229.                     {
  230.                         event = this.eventList.removeFirst();
  231.                         SimEvent<Duration> se = (SimEvent<Duration>) event;
  232.                         if (se.getTarget() instanceof Gtu && se.getMethod().equals("move"))
  233.                         {
  234.                             moveEvents.add(event);
  235.                         }
  236.                         else
  237.                         {
  238.                             try
  239.                             {
  240.                                 event.execute();
  241.                             }
  242.                             catch (Exception exception)
  243.                             {
  244.                                 getLogger().always().error(exception);
  245.                                 if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
  246.                                 {
  247.                                     try
  248.                                     {
  249.                                         this.stop();
  250.                                     }
  251.                                     catch (SimRuntimeException exception1)
  252.                                     {
  253.                                         getLogger().always().error(exception1);
  254.                                     }
  255.                                 }
  256.                             }
  257.                         }
  258.                         if (!this.eventList.isEmpty())
  259.                         {
  260.                             // peek at next event for while loop.
  261.                             event = this.eventList.first();
  262.                         }
  263.                     }
  264.                 }

  265.                 // then carry out the move events, based on a constant state at that time.
  266.                 // first make sure that new events will be stored in a temporary event list...
  267.                 this.executor = Executors.newFixedThreadPool(1);
  268.                 for (int i = 0; i < moveEvents.size(); i++)
  269.                 {
  270.                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
  271.                     final SimEventInterface<Duration> moveEvent =
  272.                             new SimEvent<>(this.simulatorTime, se.getTarget(), "movePrep", se.getArgs());
  273.                     this.executor.execute(new Runnable()
  274.                     {
  275.                         @Override
  276.                         public void run()
  277.                         {
  278.                             try
  279.                             {
  280.                                 moveEvent.execute();
  281.                             }
  282.                             catch (Exception exception)
  283.                             {
  284.                                 getLogger().always().error(exception);
  285.                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
  286.                                 {
  287.                                     try
  288.                                     {
  289.                                         OtsDevsRealTimeParallelMove.this.stop();
  290.                                     }
  291.                                     catch (SimRuntimeException exception1)
  292.                                     {
  293.                                         getLogger().always().error(exception1);
  294.                                     }
  295.                                 }
  296.                             }
  297.                         }
  298.                     });
  299.                 }
  300.                 this.executor.shutdown();
  301.                 try
  302.                 {
  303.                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
  304.                 }
  305.                 catch (InterruptedException exception)
  306.                 {
  307.                     //
  308.                 }

  309.                 this.executor = Executors.newFixedThreadPool(1);
  310.                 for (int i = 0; i < moveEvents.size(); i++)
  311.                 {
  312.                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
  313.                     final SimEventInterface<Duration> moveEvent =
  314.                             new SimEvent<>(this.simulatorTime, se.getTarget(), "moveGenerate", se.getArgs());
  315.                     this.executor.execute(new Runnable()
  316.                     {
  317.                         @Override
  318.                         public void run()
  319.                         {
  320.                             try
  321.                             {
  322.                                 moveEvent.execute();
  323.                             }
  324.                             catch (Exception exception)
  325.                             {
  326.                                 getLogger().always().error(exception);
  327.                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
  328.                                 {
  329.                                     try
  330.                                     {
  331.                                         OtsDevsRealTimeParallelMove.this.stop();
  332.                                     }
  333.                                     catch (SimRuntimeException exception1)
  334.                                     {
  335.                                         getLogger().always().error(exception1);
  336.                                     }
  337.                                 }
  338.                             }
  339.                         }
  340.                     });
  341.                 }
  342.                 this.executor.shutdown();
  343.                 try
  344.                 {
  345.                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
  346.                 }
  347.                 catch (InterruptedException exception)
  348.                 {
  349.                     //
  350.                 }

  351.                 this.executor = Executors.newFixedThreadPool(1);
  352.                 for (int i = 0; i < moveEvents.size(); i++)
  353.                 {
  354.                     SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
  355.                     final SimEventInterface<Duration> moveEvent =
  356.                             new SimEvent<>(this.simulatorTime, se.getTarget(), "moveFinish", se.getArgs());
  357.                     this.executor.execute(new Runnable()
  358.                     {
  359.                         @Override
  360.                         public void run()
  361.                         {
  362.                             try
  363.                             {
  364.                                 moveEvent.execute();
  365.                             }
  366.                             catch (Exception exception)
  367.                             {
  368.                                 getLogger().always().error(exception);
  369.                                 if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
  370.                                 {
  371.                                     try
  372.                                     {
  373.                                         OtsDevsRealTimeParallelMove.this.stop();
  374.                                     }
  375.                                     catch (SimRuntimeException exception1)
  376.                                     {
  377.                                         getLogger().always().error(exception1);
  378.                                     }
  379.                                 }
  380.                             }
  381.                         }
  382.                     });
  383.                 }
  384.                 this.executor.shutdown();
  385.                 try
  386.                 {
  387.                     this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
  388.                 }
  389.                 catch (InterruptedException exception)
  390.                 {
  391.                     //
  392.                 }

  393.             }
  394.         }
  395.         this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, /* this.simulatorTime, */ this.simulatorTime);

  396.         updateAnimation();
  397.         animationThread.stopAnimation();
  398.     }

  399. }