OtsDevsRealTimeParallelMove.java
package org.opentrafficsim.core.dsol;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.djunits.unit.DurationUnit;
import org.djunits.value.vdouble.scalar.Duration;
import org.opentrafficsim.core.gtu.Gtu;
import nl.tudelft.simulation.dsol.SimRuntimeException;
import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEvent;
import nl.tudelft.simulation.dsol.formalisms.eventscheduling.SimEventInterface;
import nl.tudelft.simulation.dsol.simulators.DevsRealTimeAnimator;
import nl.tudelft.simulation.dsol.simulators.ErrorStrategy;
import nl.tudelft.simulation.dsol.simulators.SimulatorInterface;
/**
* <p>
* Copyright (c) 2013-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
* BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
* </p>
* @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
*/
public class OtsDevsRealTimeParallelMove extends DevsRealTimeAnimator<Duration>
{
/** */
private static final long serialVersionUID = 20140909L;
/** number of threads to use for move(). */
private int moveThreads = 1;
/** the thread pool for parallel execution. */
private ExecutorService executor = null;
/**
* Create a new OTSRealTimeClock.
* @param moveThreads int; The number of move threads to use
* @param simulatorId the id of the simulator to use in remote communication
*/
public OtsDevsRealTimeParallelMove(final int moveThreads, final Serializable simulatorId)
{
super(simulatorId);
setMoveThreads(moveThreads);
setEventList(new SynchronizedRedBlackTree<>());
}
/**
* Create a new OTSRealTimeClock.
* @param simulatorId the id of the simulator to use in remote communication
*/
public OtsDevsRealTimeParallelMove(final Serializable simulatorId)
{
this(1, simulatorId);
}
/**
* @param moveThreads int; set moveThreads
*/
public final void setMoveThreads(final int moveThreads)
{
this.moveThreads = moveThreads;
}
/**
* @return moveThreads
*/
public final int getMoveThreads()
{
return this.moveThreads;
}
/** {@inheritDoc} */
@Override
protected final Duration simulatorTimeForWallClockMillis(final double factor)
{
return new Duration(factor, DurationUnit.MILLISECOND);
}
/** {@inheritDoc} */
@Override
public final String toString()
{
return "DevsRealTimeAnimator.TimeDoubleUnit [time=" + getSimulatorTime() + "]";
}
// TODO: update the run() method of OTSDEVSRTParallelMove and adapt to the latest parent class version in DSOL 3.03.07
/** {@inheritDoc} */
@Override
@SuppressWarnings("checkstyle:designforextension")
public void run()
{
AnimationThread animationThread = new AnimationThread(this);
animationThread.start();
long clockTime0 = System.currentTimeMillis(); // _________ current zero for the wall clock
Duration simTime0 = this.simulatorTime; // _______ current zero for the sim clock
double factor = getSpeedFactor(); // _____________________ local copy of speed factor to detect change
double msec1 = simulatorTimeForWallClockMillis(1.0).doubleValue(); // _____ translation factor for 1 msec for sim clock
Duration rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor); // sim clock change for 'updateMsec'
// wall clock
while (this.isStartingOrRunning() && !this.eventList.isEmpty()
&& this.getSimulatorTime().le(this.replication.getEndTime()))
{
// check if speedFactor has changed. If yes: re-baseline.
if (factor != getSpeedFactor())
{
clockTime0 = System.currentTimeMillis();
simTime0 = this.simulatorTime;
factor = getSpeedFactor();
rSim = this.simulatorTimeForWallClockMillis(getUpdateMsec() * factor);
}
// check if we are behind; syncTime is the needed current time on the wall-clock
double syncTime = (System.currentTimeMillis() - clockTime0) * msec1 * factor;
// delta is the time we might be behind
double simTime = this.simulatorTime.minus(simTime0).doubleValue();
if (syncTime > simTime)
{
// we are behind
if (!isCatchup())
{
// if no catch-up: re-baseline.
clockTime0 = System.currentTimeMillis();
simTime0 = this.simulatorTime;
}
else
{
// jump to the required wall-clock related time or to the time of the next event, whichever comes
// first
synchronized (super.semaphore)
{
Duration delta = simulatorTimeForWallClockMillis((syncTime - simTime) / msec1);
Duration absSyncTime = this.simulatorTime.plus(delta);
Duration eventTime = this.eventList.first().getAbsoluteExecutionTime();
if (absSyncTime.lt(eventTime))
{
this.simulatorTime = absSyncTime;
}
else
{
this.simulatorTime = eventTime;
}
}
}
}
// peek at the first event and determine the time difference relative to RT speed; that determines
// how long we have to wait.
SimEventInterface<Duration> event = this.eventList.first();
double simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
/*
* simTimeDiff gives the number of milliseconds between the last event and this event. if speed == 1, this is the
* number of milliseconds we have to wait. if speed == 10, we have to wait 1/10 of that. If the speed == 0.1, we
* have to wait 10 times that amount. We might also be behind.
*/
if (simTimeDiffMillis >= (System.currentTimeMillis() - clockTime0))
{
while (simTimeDiffMillis > System.currentTimeMillis() - clockTime0)
{
try
{
Thread.sleep(getUpdateMsec());
// check if speedFactor has changed. If yes: break out of this loop and execute event.
// this could cause a jump.
if (factor != getSpeedFactor())
{
simTimeDiffMillis = 0.0;
}
}
catch (InterruptedException ie)
{
// do nothing
ie = null;
}
// check if an event has been inserted. In a real-time situation this can be dome by other threads
if (!event.equals(this.eventList.first())) // event inserted by a thread...
{
event = this.eventList.first();
simTimeDiffMillis = (event.getAbsoluteExecutionTime().minus(simTime0)).doubleValue() / (msec1 * factor);
}
else
{
// make a small time step for the animation during wallclock waiting.
// but never beyond the next event time.
if (this.simulatorTime.plus(rSim).lt(event.getAbsoluteExecutionTime()))
{
synchronized (super.semaphore)
{
this.simulatorTime = this.simulatorTime.plus(rSim);
}
}
}
}
}
this.simulatorTime = event.getAbsoluteExecutionTime();
this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, this.simulatorTime);
if (this.moveThreads <= 1)
{
synchronized (super.semaphore)
{
// carry out all events scheduled on this simulation time, as long as we are still running.
while (this.isStartingOrRunning() && !this.eventList.isEmpty()
&& event.getAbsoluteExecutionTime().eq(this.simulatorTime))
{
event = this.eventList.removeFirst();
try
{
event.execute();
}
catch (Exception exception)
{
getLogger().always().error(exception);
if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
{
try
{
this.stop();
}
catch (SimRuntimeException exception1)
{
getLogger().always().error(exception1);
}
}
}
if (!this.eventList.isEmpty())
{
// peek at next event for while loop.
event = this.eventList.first();
}
}
}
}
else
{
// parallel execution of the move method
// first carry out all the non-move events and make a list of move events to be carried out in parallel
List<SimEventInterface<Duration>> moveEvents = new ArrayList<>();
synchronized (super.semaphore)
{
while (this.isStartingOrRunning() && !this.eventList.isEmpty()
&& event.getAbsoluteExecutionTime().eq(this.simulatorTime))
{
event = this.eventList.removeFirst();
SimEvent<Duration> se = (SimEvent<Duration>) event;
if (se.getTarget() instanceof Gtu && se.getMethod().equals("move"))
{
moveEvents.add(event);
}
else
{
try
{
event.execute();
}
catch (Exception exception)
{
getLogger().always().error(exception);
if (this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
{
try
{
this.stop();
}
catch (SimRuntimeException exception1)
{
getLogger().always().error(exception1);
}
}
}
}
if (!this.eventList.isEmpty())
{
// peek at next event for while loop.
event = this.eventList.first();
}
}
}
// then carry out the move events, based on a constant state at that time.
// first make sure that new events will be stored in a temporary event list...
this.executor = Executors.newFixedThreadPool(1);
for (int i = 0; i < moveEvents.size(); i++)
{
SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
final SimEventInterface<Duration> moveEvent =
new SimEvent<>(this.simulatorTime, se.getTarget(), "movePrep", se.getArgs());
this.executor.execute(new Runnable()
{
@Override
public void run()
{
try
{
moveEvent.execute();
}
catch (Exception exception)
{
getLogger().always().error(exception);
if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
{
try
{
OtsDevsRealTimeParallelMove.this.stop();
}
catch (SimRuntimeException exception1)
{
getLogger().always().error(exception1);
}
}
}
}
});
}
this.executor.shutdown();
try
{
this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
}
catch (InterruptedException exception)
{
//
}
this.executor = Executors.newFixedThreadPool(1);
for (int i = 0; i < moveEvents.size(); i++)
{
SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
final SimEventInterface<Duration> moveEvent =
new SimEvent<>(this.simulatorTime, se.getTarget(), "moveGenerate", se.getArgs());
this.executor.execute(new Runnable()
{
@Override
public void run()
{
try
{
moveEvent.execute();
}
catch (Exception exception)
{
getLogger().always().error(exception);
if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
{
try
{
OtsDevsRealTimeParallelMove.this.stop();
}
catch (SimRuntimeException exception1)
{
getLogger().always().error(exception1);
}
}
}
}
});
}
this.executor.shutdown();
try
{
this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
}
catch (InterruptedException exception)
{
//
}
this.executor = Executors.newFixedThreadPool(1);
for (int i = 0; i < moveEvents.size(); i++)
{
SimEvent<Duration> se = (SimEvent<Duration>) moveEvents.get(i);
final SimEventInterface<Duration> moveEvent =
new SimEvent<>(this.simulatorTime, se.getTarget(), "moveFinish", se.getArgs());
this.executor.execute(new Runnable()
{
@Override
public void run()
{
try
{
moveEvent.execute();
}
catch (Exception exception)
{
getLogger().always().error(exception);
if (OtsDevsRealTimeParallelMove.this.getErrorStrategy().equals(ErrorStrategy.WARN_AND_PAUSE))
{
try
{
OtsDevsRealTimeParallelMove.this.stop();
}
catch (SimRuntimeException exception1)
{
getLogger().always().error(exception1);
}
}
}
}
});
}
this.executor.shutdown();
try
{
this.executor.awaitTermination(1L, java.util.concurrent.TimeUnit.HOURS);
}
catch (InterruptedException exception)
{
//
}
}
}
this.fireTimedEvent(SimulatorInterface.TIME_CHANGED_EVENT, null, /* this.simulatorTime, */ this.simulatorTime);
updateAnimation();
animationThread.stopAnimation();
}
}