EGTF.java
- package org.opentrafficsim.core.egtf;
- import java.util.LinkedHashMap;
- import java.util.LinkedHashSet;
- import java.util.Map;
- import java.util.NavigableMap;
- import java.util.Objects;
- import java.util.Set;
- import java.util.SortedMap;
- import java.util.TreeMap;
- import java.util.stream.IntStream;
- /**
- * Extended Generalized Treiber-Helbing Filter (van Lint and Hoogendoorn, 2009). This is an extension of the Adaptive Smoothing
- * Method (Treiber and Helbing, 2002). A fast filter for equidistant grids (Schreiter et al., 2010) is available. This fast
- * implementation also supports multiple data sources.
- * <p>
- * To allow flexible usage the EGTF works with {@code DataSource}, {@code DataStream} and {@code Quantity}.
- * <p>
- * A {@code DataSource}, such as "loop detectors", "floating-car data" or "camera" is mostly an identifier, but can be requested
- * to provide several data streams.
- * <p>
- * A {@code DataStream} is one {@code DataSource} supplying one {@code Quantity}. For instance "loop detectors" supplying
- * "flow". In a {@code DataStream}, supplied by the {@code DataSource}, standard deviation of measurements in congestion and
- * free flow are defined. These determine the reliability of the {@code Quantity} data from the given {@code DataSource}, and
- * thus ultimately the weight of the data in the estimation of the quantity.
- * <p>
- * A {@code Quantity}, such as "flow" or "density" defines what is measured and what is requested as output. The output can be
- * in typed format using a {@code Converter}. Default quantities are available under {@code SPEED_SI}, {@code FLOW_SI} and
- * {@code DENSITY_SI}, all under {@code Quantity}.
- * <p>
- * Data can be added using several methods for point data, vector data (multiple independent location-time combinations) and
- * grid data (data in a grid pattern). Data is added for a particular {@code DataStream}.
- * <p>
- * For simple use-cases where a single data source is used, data can be added directly with a {@code Quantity}, in which case a
- * default {@code DataSource}, and default {@code DataStream} for each {@code Quantity} is internally used. All data should
- * either be added using {@code Quantity}'s, or it should all be added using {@code DataSource}'s. Otherwise relative data
- * reliability is undefined.
- * <p>
- * Output can be requested from the EGTF using a {@code Kernel}, a spatiotemporal pattern determining measurement weights. The
- * {@code Kernel} defines an optional maximum spatial and temporal range for measurements to consider, and uses a {@code Shape}
- * to determine the weight for a given distance and time from the estimated point. By default this is an exponential function. A
- * Gaussian kernel is also available, while any other shape could be also be implemented.
- * <p>
- * Parameters from the EGTF are found in the following places:
- * <ul>
- * <li>{@code EGTF}: <i>cCong</i>, <i>cFree</i>, <i>deltaV</i> and <i>vc</i>, defining the overall traffic flow properties.</li>
- * <li>{@code Kernel}: <i>tMax</i> and <i>xMax</i>, defining the maximum range to consider.</li>
- * <li>{@code KernelShape}: <i>sigma</i> and <i>tau</i>, determining the decay of weights for further measurements in space and
- * time.</li>
- * <li>{@code DataStream}: <i>thetaCong</i> and <i>thetaFree</i>, defining the reliability by the standard deviation of measured
- * data in free flow and congestion from a particular data stream.</li>
- * </ul>
- * References:
- * <ul>
- * <li>van Lint, J. W. C. and Hoogendoorn, S. P. (2009). A robust and efficient method for fusing heterogeneous data from
- * traffic sensors on freeways. Computer Aided Civil and Infrastructure Engineering, accepted for publication.</li>
- * <li>Schreiter, T., van Lint, J. W. C., Treiber, M. and Hoogendoorn, S. P. (2010). Two fast implementations of the Adaptive
- * Smoothing Method used in highway traffic state estimation. 13th International IEEE Conference on Intelligent Transportation
- * Systems, 19-22 Sept. 2010, Funchal, Portugal.</li>
- * <li>Treiber, M. and Helbing, D. (2002). Reconstructing the spatio-temporal traffic dynamics from stationary detector data.
- * Cooper@tive Tr@nsport@tion Dyn@mics, 1:3.1–3.24.</li>
- * </ul>
- * <p>
- * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
- * BSD-style license. See <a href="http://opentrafficsim.org/node/13">OpenTrafficSim License</a>.
- * <p>
- * @version $Revision$, $LastChangedDate$, by $Author$, initial version 3 okt. 2018 <br>
- * @author <a href="http://www.transport.citg.tudelft.nl">Wouter Schakel</a>
- */
- public class EGTF
- {
- /** Default sigma value. */
- private static final double DEFAULT_SIGMA = 300.0;
- /** Default tau value. */
- private static final double DEFAULT_TAU = 30.0;
- /** Filter kernel. */
- private Kernel kernel;
- /** Shock wave speed in congestion. */
- private final double cCong;
- /** Shock wave speed in free flow. */
- private final double cFree;
- /** Speed range between congestion and free flow. */
- private final double deltaV;
- /** Flip-over speed below which we have congestion. */
- private final double vc;
- /** Data sources by label so we can return the same instances upon repeated request. */
- private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
- /** Default data source for cases where a single data source is used. */
- private DataSource defaultDataSource = null;
- /** Default data streams for cases where a single data source is used. */
- private Map<Quantity<?, ?>, DataStream<?>> defaultDataStreams = null;
- /** True if data is currently being added using a quantity, in which case a check should not occur. */
- private boolean addingByQuantity;
- /** All point data sorted by space and time, and per data stream. */
- private NavigableMap<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> data = new TreeMap<>();
- /** Whether the calculation was interrupted. */
- private boolean interrupted = false;
- /** Listeners. */
- private Set<EgtfListener> listeners = new LinkedHashSet<>();
- /**
- * Constructor using cCong = -18km/h, cFree = 80km/h, deltaV = 10km/h and vc = 80km/h. A default kernel is set.
- */
- public EGTF()
- {
- this(-18.0, 80.0, 10.0, 80.0);
- }
- /**
- * Constructor defining global settings. A default kernel is set.
- * @param cCong double; shock wave speed in congestion [km/h]
- * @param cFree double; shock wave speed in free flow [km/h]
- * @param deltaV double; speed range between congestion and free flow [km/h]
- * @param vc double; flip-over speed below which we have congestion [km/h]
- */
- public EGTF(final double cCong, final double cFree, final double deltaV, final double vc)
- {
- this.cCong = cCong / 3.6;
- this.cFree = cFree / 3.6;
- this.deltaV = deltaV / 3.6;
- this.vc = vc / 3.6;
- setKernel();
- }
- /**
- * Convenience constructor that also sets a specified kernel.
- * @param cCong double; shock wave speed in congestion [km/h]
- * @param cFree double; shock wave speed in free flow [km/h]
- * @param deltaV double; speed range between congestion and free flow [km/h]
- * @param vc double; flip-over speed below which we have congestion [km/h]
- * @param sigma double; spatial kernel size in [m]
- * @param tau double; temporal kernel size in [s]
- * @param xMax double; maximum spatial range in [m]
- * @param tMax double; maximum temporal range in [s]
- */
- @SuppressWarnings("parameternumber")
- public EGTF(final double cCong, final double cFree, final double deltaV, final double vc, final double sigma,
- final double tau, final double xMax, final double tMax)
- {
- this(cCong, cFree, deltaV, vc);
- setKernelSI(sigma, tau, xMax, tMax);
- }
- // ********************
- // *** DATA METHODS ***
- // ********************
- /**
- * Return a data source, which is created if necessary.
- * @param name String; unique name for the data source
- * @return DataSource; data source
- * @throws IllegalStateException when data has been added without a data source
- */
- public DataSource getDataSource(final String name)
- {
- if (this.defaultDataSource != null)
- {
- throw new IllegalStateException(
- "Obtaining a (new) data source after data has been added without a data source is not allowed.");
- }
- return this.dataSources.computeIfAbsent(name, (key) -> new DataSource(key));
- }
- /**
- * Removes all data from before the given time. This is useful in live usages of this class, where older data is no longer
- * required.
- * @param time double; time before which all data can be removed
- */
- public synchronized void clearDataBefore(final double time)
- {
- for (SortedMap<Double, Map<DataStream<?>, Double>> map : this.data.values())
- {
- map.subMap(Double.NEGATIVE_INFINITY, time).clear();
- }
- }
- /**
- * Adds point data.
- * @param quantity Quantity<?, ?>; quantity of the data
- * @param location double; location in [m]
- * @param time double; time in [s]
- * @param value double; data value
- * @throws IllegalStateException if data was added with a data stream previously
- */
- public synchronized void addPointDataSI(final Quantity<?, ?> quantity, final double location, final double time,
- final double value)
- {
- this.addingByQuantity = true;
- addPointDataSI(getDefaultDataStream(quantity), location, time, value);
- this.addingByQuantity = false;
- }
- /**
- * Adds point data.
- * @param dataStream DataStream<?>; data stream of the data
- * @param location double; location in [m]
- * @param time double; time in [s]
- * @param value double; data value
- * @throws IllegalStateException if data was added with a quantity previously
- */
- public synchronized void addPointDataSI(final DataStream<?> dataStream, final double location, final double time,
- final double value)
- {
- checkNoQuantityData();
- Objects.requireNonNull(dataStream, "Datastream may not be null.");
- if (!Double.isNaN(value))
- {
- getSpacioTemporalData(getSpatialData(location), time).put(dataStream, value);
- }
- }
- /**
- * Adds vector data.
- * @param quantity Quantity<?, ?>; quantity of the data
- * @param location double[]; locations in [m]
- * @param time double[]; times in [s]
- * @param values double[]; data values in SI unit
- * @throws IllegalStateException if data was added with a data stream previously
- */
- public synchronized void addVectorDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
- final double[] values)
- {
- this.addingByQuantity = true;
- addVectorDataSI(getDefaultDataStream(quantity), location, time, values);
- this.addingByQuantity = false;
- }
- /**
- * Adds vector data.
- * @param dataStream DataStream<?>; data stream of the data
- * @param location double[]; locations in [m]
- * @param time double[]; times in [s]
- * @param values double[]; data values in SI unit
- * @throws IllegalStateException if data was added with a quantity previously
- */
- public synchronized void addVectorDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
- final double[] values)
- {
- checkNoQuantityData();
- Objects.requireNonNull(dataStream, "Datastream may not be null.");
- Objects.requireNonNull(location, "Location may not be null.");
- Objects.requireNonNull(time, "Time may not be null.");
- Objects.requireNonNull(values, "Values may not be null.");
- if (location.length != time.length || time.length != values.length)
- {
- throw new IllegalArgumentException(String.format("Unequal lengths: location %d, time %d, data %d.", location.length,
- time.length, values.length));
- }
- for (int i = 0; i < values.length; i++)
- {
- if (!Double.isNaN(values[i]))
- {
- getSpacioTemporalData(getSpatialData(location[i]), time[i]).put(dataStream, values[i]);
- }
- }
- }
- /**
- * Adds grid data.
- * @param quantity Quantity<?, ?>; quantity of the data
- * @param location double[]; locations in [m]
- * @param time double[]; times in [s]
- * @param values double[][]; data values in SI unit
- * @throws IllegalStateException if data was added with a data stream previously
- */
- public synchronized void addGridDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
- final double[][] values)
- {
- this.addingByQuantity = true;
- addGridDataSI(getDefaultDataStream(quantity), location, time, values);
- this.addingByQuantity = false;
- }
- /**
- * Adds grid data.
- * @param dataStream DataStream<?>; data stream of the data
- * @param location double[]; locations in [m]
- * @param time double[]; times in [s]
- * @param values double[][]; data values in SI unit
- * @throws IllegalStateException if data was added with a quantity previously
- */
- public synchronized void addGridDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
- final double[][] values)
- {
- checkNoQuantityData();
- Objects.requireNonNull(dataStream, "Datastream may not be null.");
- Objects.requireNonNull(location, "Location may not be null.");
- Objects.requireNonNull(time, "Time may not be null.");
- Objects.requireNonNull(values, "Values may not be null.");
- if (values.length != location.length)
- {
- throw new IllegalArgumentException(
- String.format("%d locations while length of data is %d", location.length, values.length));
- }
- for (int i = 0; i < location.length; i++)
- {
- if (values[i].length != time.length)
- {
- throw new IllegalArgumentException(
- String.format("%d times while length of data is %d", time.length, values[i].length));
- }
- Map<Double, Map<DataStream<?>, Double>> spatialData = getSpatialData(location[i]);
- for (int j = 0; j < time.length; j++)
- {
- if (!Double.isNaN(values[i][j]))
- {
- getSpacioTemporalData(spatialData, time[j]).put(dataStream, values[i][j]);
- }
- }
- }
- }
- /**
- * Check that no data was added using a quantity.
- * @throws IllegalStateException if data was added with a quantity previously
- */
- private void checkNoQuantityData()
- {
- if (!this.addingByQuantity && this.defaultDataSource != null)
- {
- throw new IllegalStateException(
- "Adding data with a data stream is not allowed after data has been added with a quantity.");
- }
- }
- /**
- * Returns a default data stream and checks that no data with a data stream was added.
- * @param quantity Quantity<?, ?>; quantity
- * @return DataStream<?>; default data stream
- * @throws IllegalStateException if data was added with a data stream previously
- */
- private DataStream<?> getDefaultDataStream(final Quantity<?, ?> quantity)
- {
- Objects.requireNonNull(quantity, "Quantity may not be null.");
- if (!this.dataSources.isEmpty())
- {
- throw new IllegalStateException(
- "Adding data with a quantity is not allowed after data has been added with a data stream.");
- }
- if (this.defaultDataSource == null)
- {
- this.defaultDataSource = new DataSource("default");
- this.defaultDataStreams = new LinkedHashMap<>();
- }
- return this.defaultDataStreams.computeIfAbsent(quantity,
- (key) -> this.defaultDataSource.addStreamSI(quantity, 1.0, 1.0));
- }
- /**
- * Returns data from a specific location as a subset from all data. An empty map is returned if no such data exists.
- * @param location double; location in [m]
- * @return data from a specific location
- */
- private SortedMap<Double, Map<DataStream<?>, Double>> getSpatialData(final double location)
- {
- return this.data.computeIfAbsent(location, (key) -> new TreeMap<>());
- }
- /**
- * Returns data from a specific time as a subset of data from a specific location. An empty map is returned if no such data
- * exists.
- * @param spatialData Map<Double, Map<DataStream<?>, Double>>; spatially selected data
- * @param time double; time in [s]
- * @return data from a specific time, from data from a specific location
- */
- private Map<DataStream<?>, Double> getSpacioTemporalData(final Map<Double, Map<DataStream<?>, Double>> spatialData,
- final double time)
- {
- return spatialData.computeIfAbsent(time, (key) -> new LinkedHashMap<>());
- }
- // **********************
- // *** KERNEL METHODS ***
- // **********************
- /**
- * Sets a default exponential kernel with infinite range, sigma = 300m, and tau = 30s.
- */
- public void setKernel()
- {
- setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, new ExpKernelShape(DEFAULT_SIGMA, DEFAULT_TAU));
- }
- /**
- * Sets an exponential kernel with infinite range.
- * @param sigma double; spatial kernel size
- * @param tau double; temporal kernel size
- */
- public void setKernelSI(final double sigma, final double tau)
- {
- setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
- }
- /**
- * Sets an exponential kernel with limited range.
- * @param sigma double; spatial kernel size in [m]
- * @param tau double; temporal kernel size in [s]
- * @param xMax double; maximum spatial range in [m]
- * @param tMax double; maximum temporal range in [s]
- */
- public void setKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
- {
- setKernelSI(xMax, tMax, new ExpKernelShape(sigma, tau));
- }
- /**
- * Sets a Gaussian kernel with infinite range.
- * @param sigma double; spatial kernel size
- * @param tau double; temporal kernel size
- */
- public void setGaussKernelSI(final double sigma, final double tau)
- {
- setGaussKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
- }
- /**
- * Sets a Gaussian kernel with limited range.
- * @param sigma double; spatial kernel size in [m]
- * @param tau double; temporal kernel size in [s]
- * @param xMax double; maximum spatial range in [m]
- * @param tMax double; maximum temporal range in [s]
- */
- public void setGaussKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
- {
- setKernelSI(xMax, tMax, new GaussKernelShape(sigma, tau));
- }
- /**
- * Sets a kernel with limited range and provided shape. The shape allows using non-exponential kernels.
- * @param xMax double; maximum spatial range
- * @param tMax double; maximum temporal range
- * @param shape KernelShape; shape of the kernel
- */
- public synchronized void setKernelSI(final double xMax, final double tMax, final KernelShape shape)
- {
- this.kernel = new Kernel(xMax, tMax, shape);
- }
- /**
- * Returns the wave speed in congestion.
- * @return double; wave speed in congestion
- */
- final double getWaveSpeedCongestion()
- {
- return this.cCong;
- }
- /**
- * Returns the wave speed in free flow.
- * @return double; wave speed in free flow
- */
- final double getWaveSpeedFreeFlow()
- {
- return this.cFree;
- }
- // **********************
- // *** FILTER METHODS ***
- // **********************
- /**
- * Executes filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
- * done. Finally, the filtering results can then be obtained from the listener.
- * @param location double[]; location of output grid in [m]
- * @param time double[]; time of output grid in [s]
- * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
- * @return EgtfParallelListener; listener to notify keep track of the progress
- */
- public EgtfParallelListener filterParallelSI(final double[] location, final double[] time,
- final Quantity<?, ?>... quantities)
- {
- Objects.requireNonNull(location, "Location may not be null.");
- Objects.requireNonNull(time, "Time may not be null.");
- EgtfParallelListener listener = new EgtfParallelListener();
- addListener(listener);
- new Thread(new Runnable()
- {
- /** {@inheritDoc} */
- @Override
- public void run()
- {
- listener.setFilter(filterSI(location, time, quantities));
- removeListener(listener);
- }
- }, "Egtf calculation thread").start();
- return listener;
- }
- /**
- * Executes fast filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
- * done. Finally, the filtering results can then be obtained from the listener.
- * @param xMin double; minimum location value of output grid [m]
- * @param xStep double; location step of output grid [m]
- * @param xMax double; maximum location value of output grid [m]
- * @param tMin double; minimum time value of output grid [s]
- * @param tStep double; time step of output grid [s]
- * @param tMax double; maximum time value of output grid [s]
- * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
- * @return EgtfParallelListener; listener to notify keep track of the progress
- */
- public EgtfParallelListener filterParallelFastSI(final double xMin, final double xStep, final double xMax,
- final double tMin, final double tStep, final double tMax, final Quantity<?, ?>... quantities)
- {
- EgtfParallelListener listener = new EgtfParallelListener();
- addListener(listener);
- new Thread(new Runnable()
- {
- /** {@inheritDoc} */
- @Override
- public void run()
- {
- listener.setFilter(filterFastSI(xMin, xStep, xMax, tMin, tStep, tMax, quantities));
- removeListener(listener);
- }
- }, "Egtf calculation thread").start();
- return listener;
- }
- /**
- * Returns filtered data. This is the standard EGTF implementation.
- * @param location double[]; location of output grid in [m]
- * @param time double[]; time of output grid in [s]
- * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
- * @return Filter; filtered data, {@code null} when interrupted
- */
- @SuppressWarnings({"synthetic-access", "methodlength"})
- public Filter filterSI(final double[] location, final double[] time, final Quantity<?, ?>... quantities)
- {
- Objects.requireNonNull(location, "Location may not be null.");
- Objects.requireNonNull(time, "Time may not be null.");
- // initialize data
- Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
- for (Quantity<?, ?> quantity : quantities)
- {
- map.put(quantity, new double[location.length][time.length]);
- }
- // loop grid locations
- for (int i = 0; i < location.length; i++)
- {
- double xGrid = location[i];
- // filter applicable data for location
- Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
- this.data.subMap(this.kernel.fromLocation(xGrid), true, this.kernel.toLocation(xGrid), true);
- // loop grid times
- for (int j = 0; j < time.length; j++)
- {
- double tGrid = time[j];
- // notify
- if (notifyListeners((i + (double) j / time.length) / location.length))
- {
- return null;
- }
- // initialize data per stream
- // quantity z assuming congestion and free flow
- Map<DataStream<?>, DualWeightedMean> zCongFree = new LinkedHashMap<>();
- // filter and loop applicable data for time
- for (Map.Entry<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> xEntry : spatialData.entrySet())
- {
- double dx = xEntry.getKey() - xGrid;
- Map<Double, Map<DataStream<?>, Double>> temporalData =
- xEntry.getValue().subMap(this.kernel.fromTime(tGrid), true, this.kernel.toTime(tGrid), true);
- for (Map.Entry<Double, Map<DataStream<?>, Double>> tEntry : temporalData.entrySet())
- {
- double dt = tEntry.getKey() - tGrid;
- Map<DataStream<?>, Double> pData = tEntry.getValue();
- double phiCong = this.kernel.weight(this.cCong, dx, dt);
- double phiFree = this.kernel.weight(this.cFree, dx, dt);
- // loop streams data at point
- for (Map.Entry<DataStream<?>, Double> vEntry : pData.entrySet())
- {
- DataStream<?> stream = vEntry.getKey();
- if (map.containsKey(stream.getQuantity()) || stream.getQuantity().isSpeed())
- {
- double v = vEntry.getValue();
- DualWeightedMean zCongFreeOfStream =
- zCongFree.computeIfAbsent(stream, (key) -> new DualWeightedMean());
- zCongFreeOfStream.addCong(v, phiCong);
- zCongFreeOfStream.addFree(v, phiFree);
- }
- }
- }
- }
- // figure out the congestion level estimated for each data source
- Map<DataSource, Double> w = new LinkedHashMap<>();
- for (Map.Entry<DataStream<?>, DualWeightedMean> streamEntry : zCongFree.entrySet())
- {
- DataStream<?> dataStream = streamEntry.getKey();
- if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
- {
- DualWeightedMean zCongFreeOfStream = streamEntry.getValue();
- double u = Math.min(zCongFreeOfStream.getCong(), zCongFreeOfStream.getFree());
- w.put(dataStream.getDataSource(), // 1 speed quantity per source allowed
- .5 * (1.0 + Math.tanh((EGTF.this.vc - u) / EGTF.this.deltaV)));
- continue;
- }
- }
- // sum available data sources per quantity
- Double wMean = null;
- for (Map.Entry<Quantity<?, ?>, double[][]> qEntry : map.entrySet())
- {
- Quantity<?, ?> quantity = qEntry.getKey();
- WeightedMean z = new WeightedMean();
- for (Map.Entry<DataStream<?>, DualWeightedMean> zEntry : zCongFree.entrySet())
- {
- DataStream<?> dataStream = zEntry.getKey();
- if (dataStream.getQuantity().equals(quantity))
- {
- // obtain congestion level
- double wCong;
- if (!w.containsKey(dataStream.getDataSource()))
- {
- // this data source has no speed data, but congestion level can be estimated from other sources
- if (wMean == null)
- {
- // let's see if speed was estimated already
- for (Quantity<?, ?> prevQuant : quantities)
- {
- if (prevQuant.equals(quantity))
- {
- // it was not, get mean of other data source
- wMean = 0.0;
- for (double ww : w.values())
- {
- wMean += ww / w.size();
- }
- break;
- }
- else if (prevQuant.isSpeed())
- {
- wMean = .5 * (1.0
- + Math.tanh((EGTF.this.vc - map.get(prevQuant)[i][j]) / EGTF.this.deltaV));
- break;
- }
- }
- }
- wCong = wMean;
- }
- else
- {
- wCong = w.get(dataStream.getDataSource());
- }
- // calculate estimated value z of this data source (no duplicate quantities per source allowed)
- double wfree = 1.0 - wCong;
- DualWeightedMean zCongFreej = zEntry.getValue();
- double zStream = wCong * zCongFreej.getCong() + wfree * zCongFreej.getFree();
- double weight;
- if (w.size() > 1)
- {
- // data source more important if more and nearer measurements
- double beta = wCong * zCongFreej.getDenominatorCong() + wfree * zCongFreej.getDenominatorFree();
- // more important if more reliable (smaller standard deviation) at congestion level
- double alpha = wCong / dataStream.getThetaCong() + wfree / dataStream.getThetaFree();
- weight = alpha * beta;
- }
- else
- {
- weight = 1.0;
- }
- z.add(zStream, weight);
- }
- }
- qEntry.getValue()[i][j] = z.get();
- }
- }
- }
- notifyListeners(1.0);
- return new FilterDouble(location, time, map);
- }
- /**
- * Returns filtered data that is processed using fast fourier transformation. This is much faster than the standard filter,
- * at the cost that all input data is discretized to the output grid. The gain in computation speed is however such that
- * finer output grids can be used to alleviate this. For discretization the output grid needs to be equidistant. It is
- * recommended to set a Kernel with maximum bounds before using this method.
- * <p>
- * More than being a fast implementation of the Adaptive Smoothing Method, this implementation includes all data source like
- * the Extended Generalized Treiber-Helbing Filter.
- * @param xMin double; minimum location value of output grid [m]
- * @param xStep double; location step of output grid [m]
- * @param xMax double; maximum location value of output grid [m]
- * @param tMin double; minimum time value of output grid [s]
- * @param tStep double; time step of output grid [s]
- * @param tMax double; maximum time value of output grid [s]
- * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
- * @return Filter; filtered data, {@code null} when interrupted
- */
- @SuppressWarnings("methodlength")
- public Filter filterFastSI(final double xMin, final double xStep, final double xMax, final double tMin, final double tStep,
- final double tMax, final Quantity<?, ?>... quantities)
- {
- if (xMin > xMax || xStep <= 0.0 || tMin > tMax || tStep <= 0.0)
- {
- throw new IllegalArgumentException(
- "Ill-defined grid. Make sure that xMax >= xMin, dx > 0, tMax >= tMin and dt > 0");
- }
- if (notifyListeners(0.0))
- {
- return null;
- }
- // initialize data
- int n = 1 + (int) ((xMax - xMin) / xStep);
- double[] location = new double[n];
- IntStream.range(0, n).forEach(i -> location[i] = xMin + i * xStep);
- n = 1 + (int) ((tMax - tMin) / tStep);
- double[] time = new double[n];
- IntStream.range(0, n).forEach(j -> time[j] = tMin + j * tStep);
- Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
- Map<Quantity<?, ?>, double[][]> weights = new LinkedHashMap<>();
- for (Quantity<?, ?> quantity : quantities)
- {
- map.put(quantity, new double[location.length][time.length]);
- weights.put(quantity, new double[location.length][time.length]);
- }
- // discretize Kernel
- double xFrom = this.kernel.fromLocation(0.0);
- xFrom = Double.isInfinite(xFrom) ? 2.0 * (xMin - xMax) : xFrom;
- double xTo = this.kernel.toLocation(0.0);
- xTo = Double.isInfinite(xTo) ? 2.0 * (xMax - xMin) : xTo;
- double[] dx = equidistant(xFrom, xStep, xTo);
- double tFrom = this.kernel.fromTime(0.0);
- tFrom = Double.isInfinite(tFrom) ? 2.0 * (tMin - tMax) : tFrom;
- double tTo = this.kernel.toTime(0.0);
- tTo = Double.isInfinite(tTo) ? 2.0 * (tMax - tMin) : tTo;
- double[] dt = equidistant(tFrom, tStep, tTo);
- double[][] phiCong = new double[dx.length][dt.length];
- double[][] phiFree = new double[dx.length][dt.length];
- for (int i = 0; i < dx.length; i++)
- {
- for (int j = 0; j < dt.length; j++)
- {
- phiCong[i][j] = this.kernel.weight(this.cCong, dx[i], dt[j]);
- phiFree[i][j] = this.kernel.weight(this.cFree, dx[i], dt[j]);
- }
- }
- // discretize data
- Map<DataStream<?>, double[][]> dataSum = new LinkedHashMap<>();
- Map<DataStream<?>, double[][]> dataCount = new LinkedHashMap<>(); // integer counts, must be double[][] for convolution
- // loop grid locations
- for (int i = 0; i < location.length; i++)
- {
- // filter applicable data for location
- Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
- this.data.subMap(location[i] - 0.5 * xStep, true, location[i] + 0.5 * xStep, true);
- // loop grid times
- for (int j = 0; j < time.length; j++)
- {
- // filter and loop applicable data for time
- for (NavigableMap<Double, Map<DataStream<?>, Double>> locationData : spatialData.values())
- {
- NavigableMap<Double, Map<DataStream<?>, Double>> temporalData =
- locationData.subMap(time[j] - 0.5 * tStep, true, time[j] + 0.5 * tStep, true);
- for (Map<DataStream<?>, Double> timeData : temporalData.values())
- {
- for (Map.Entry<DataStream<?>, Double> timeEntry : timeData.entrySet())
- {
- if (map.containsKey(timeEntry.getKey().getQuantity()) || timeEntry.getKey().getQuantity().isSpeed())
- {
- dataSum.computeIfAbsent(timeEntry.getKey(),
- (key) -> new double[location.length][time.length])[i][j] += timeEntry.getValue();
- dataCount.computeIfAbsent(timeEntry.getKey(),
- (key) -> new double[location.length][time.length])[i][j]++;
- }
- }
- }
- }
- }
- }
- // figure out the congestion level estimated for each data source
- double steps = quantities.length + 1; // speed (for congestion level) and then all in quantities
- double step = 0;
- // store maps to prevent us from calculating the convolution for speed again later
- Map<DataSource, double[][]> w = new LinkedHashMap<>();
- Map<DataSource, double[][]> zCongSpeed = new LinkedHashMap<>();
- Map<DataSource, double[][]> zFreeSpeed = new LinkedHashMap<>();
- Map<DataSource, double[][]> nCongSpeed = new LinkedHashMap<>();
- Map<DataSource, double[][]> nFreeSpeed = new LinkedHashMap<>();
- for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
- {
- DataStream<?> dataStream = zEntry.getKey();
- if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
- {
- // notify
- double[][] vCong = Convolution.convolution(phiCong, zEntry.getValue());
- if (notifyListeners((step + 0.25) / steps))
- {
- return null;
- }
- double[][] vFree = Convolution.convolution(phiFree, zEntry.getValue());
- if (notifyListeners((step + 0.5) / steps))
- {
- return null;
- }
- double[][] count = dataCount.get(dataStream);
- double[][] nCong = Convolution.convolution(phiCong, count);
- if (notifyListeners((step + 0.75) / steps))
- {
- return null;
- }
- double[][] nFree = Convolution.convolution(phiFree, count);
- double[][] wSource = new double[vCong.length][vCong[0].length];
- for (int i = 0; i < vCong.length; i++)
- {
- for (int j = 0; j < vCong[0].length; j++)
- {
- double u = Math.min(vCong[i][j] / nCong[i][j], vFree[i][j] / nFree[i][j]);
- wSource[i][j] = .5 * (1.0 + Math.tanh((EGTF.this.vc - u) / EGTF.this.deltaV));
- }
- }
- w.put(dataStream.getDataSource(), wSource);
- zCongSpeed.put(dataStream.getDataSource(), vCong);
- zFreeSpeed.put(dataStream.getDataSource(), vFree);
- nCongSpeed.put(dataStream.getDataSource(), nCong);
- nFreeSpeed.put(dataStream.getDataSource(), nFree);
- }
- }
- step++;
- if (notifyListeners(step / steps))
- {
- return null;
- }
- // sum available data sources per quantity
- double[][] wMean = null;
- for (Quantity<?, ?> quantity : quantities)
- {
- // gather place for this quantity
- double[][] qData = map.get(quantity);
- double[][] qWeights = weights.get(quantity);
- // loop streams that provide this quantity
- Set<Map.Entry<DataStream<?>, double[][]>> zEntries = new LinkedHashSet<>();
- for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
- {
- if (zEntry.getKey().getQuantity().equals(quantity))
- {
- zEntries.add(zEntry);
- }
- }
- double streamCounter = 0;
- for (Map.Entry<DataStream<?>, double[][]> zEntry : zEntries)
- {
- DataStream<?> dataStream = zEntry.getKey();
- // obtain congestion level
- double[][] wj;
- if (!w.containsKey(dataStream.getDataSource()))
- {
- // this data source has no speed data, but congestion level can be estimated from other sources
- if (wMean == null)
- {
- // let's see if speed was estimated already
- for (Quantity<?, ?> prevQuant : quantities)
- {
- if (prevQuant.equals(quantity))
- {
- // it was not, get mean of other data source
- wMean = new double[location.length][time.length];
- for (double[][] ww : w.values())
- {
- for (int i = 0; i < location.length; i++)
- {
- for (int j = 0; j < time.length; j++)
- {
- wMean[i][j] += ww[i][j] / w.size();
- }
- }
- }
- break;
- }
- else if (prevQuant.isSpeed())
- {
- wMean = new double[location.length][time.length];
- double[][] v = map.get(prevQuant);
- for (int i = 0; i < location.length; i++)
- {
- for (int j = 0; j < time.length; j++)
- {
- wMean[i][j] = .5 * (1.0 + Math.tanh((EGTF.this.vc - v[i][j]) / EGTF.this.deltaV));
- }
- }
- break;
- }
- }
- }
- wj = wMean;
- }
- else
- {
- wj = w.get(dataStream.getDataSource());
- }
- // convolutions of filters with discretized data and data counts
- double[][] zCong;
- double[][] zFree;
- double[][] nCong;
- double[][] nFree;
- if (dataStream.getQuantity().isSpeed())
- {
- zCong = zCongSpeed.get(dataStream.getDataSource());
- zFree = zFreeSpeed.get(dataStream.getDataSource());
- nCong = nCongSpeed.get(dataStream.getDataSource());
- nFree = nFreeSpeed.get(dataStream.getDataSource());
- }
- else
- {
- zCong = Convolution.convolution(phiCong, zEntry.getValue());
- if (notifyListeners((step + (streamCounter + 0.25) / zEntries.size()) / steps))
- {
- return null;
- }
- zFree = Convolution.convolution(phiFree, zEntry.getValue());
- if (notifyListeners((step + (streamCounter + 0.5) / zEntries.size()) / steps))
- {
- return null;
- }
- double[][] count = dataCount.get(dataStream);
- nCong = Convolution.convolution(phiCong, count);
- if (notifyListeners((step + (streamCounter + 0.75) / zEntries.size()) / steps))
- {
- return null;
- }
- nFree = Convolution.convolution(phiFree, count);
- }
- // loop grid to add to each weighted sum (weighted per data source)
- for (int i = 0; i < location.length; i++)
- {
- for (int j = 0; j < time.length; j++)
- {
- double wCong = wj[i][j];
- double wFree = 1.0 - wCong;
- double value = wCong * zCong[i][j] / nCong[i][j] + wFree * zFree[i][j] / nFree[i][j];
- // the fast filter supplies convoluted data counts, i.e. amount of data and filter proximity; this
- // is exactly what the EGTF method needs to weigh data sources
- double beta = wCong * nCong[i][j] + wFree * nFree[i][j];
- double alpha = wCong / dataStream.getThetaCong() + wFree / dataStream.getThetaFree();
- double weight = beta * alpha;
- qData[i][j] += (value * weight);
- qWeights[i][j] += weight;
- }
- }
- streamCounter++;
- if (notifyListeners((step + streamCounter / zEntries.size()) / steps))
- {
- return null;
- }
- }
- for (int i = 0; i < location.length; i++)
- {
- for (int j = 0; j < time.length; j++)
- {
- qData[i][j] /= qWeights[i][j];
- }
- }
- step++;
- }
- return new FilterDouble(location, time, map);
- }
- /**
- * Returns an equidistant vector that includes 0.
- * @param from double; lowest value to include
- * @param step double; step
- * @param to double; highest value to include
- * @return double[]; equidistant vector that includes 0
- */
- private double[] equidistant(final double from, final double step, final double to)
- {
- int n1 = (int) (-from / step);
- int n2 = (int) (to / step);
- int n = n1 + n2 + 1;
- double[] array = new double[n];
- for (int i = 0; i < n; i++)
- {
- array[i] = i < n1 ? step * (-n1 + i) : step * (i - n1);
- }
- return array;
- }
- // *********************
- // *** EVENT METHODS ***
- // *********************
- /**
- * Interrupt the calculation.
- */
- public final void interrupt()
- {
- this.interrupted = true;
- }
- /**
- * Add listener.
- * @param listener EgtfListener; listener
- */
- public final void addListener(final EgtfListener listener)
- {
- this.listeners.add(listener);
- }
- /**
- * Remove listener.
- * @param listener EgtfListener; listener
- */
- public final void removeListener(final EgtfListener listener)
- {
- this.listeners.remove(listener);
- }
- /**
- * Notify all listeners.
- * @param progress double; progress, a value in the range [0 ... 1]
- * @return boolean; whether the filter is interrupted
- */
- private boolean notifyListeners(final double progress)
- {
- if (!this.listeners.isEmpty())
- {
- EgtfEvent event = new EgtfEvent(this, progress);
- for (EgtfListener listener : this.listeners)
- {
- listener.notifyProgress(event);
- }
- }
- return this.interrupted;
- }
- // **********************
- // *** HELPER CLASSES ***
- // **********************
- /**
- * Small class to build up a weighted mean under the congestion and free flow assumption.
- */
- private class DualWeightedMean
- {
- /** Cumulative congestion numerator of weighted mean fraction, i.e. weighted sum. */
- private double numeratorCong;
- /** Cumulative free flow numerator of weighted mean fraction, i.e. weighted sum. */
- private double numeratorFree;
- /** Cumulative congestion denominator of weighted mean fraction, i.e. sum of weights. */
- private double denominatorCong;
- /** Cumulative free flow denominator of weighted mean fraction, i.e. sum of weights. */
- private double denominatorFree;
- /**
- * Adds a congestion value with weight.
- * @param value double; value
- * @param weight double; weight
- */
- public void addCong(final double value, final double weight)
- {
- this.numeratorCong += value * weight;
- this.denominatorCong += weight;
- }
- /**
- * Adds a free flow value with weight.
- * @param value double; value
- * @param weight double; weight
- */
- public void addFree(final double value, final double weight)
- {
- this.numeratorFree += value * weight;
- this.denominatorFree += weight;
- }
- /**
- * Returns the weighted congestion mean of available data.
- * @return double; weighted mean of available data
- */
- public double getCong()
- {
- return this.numeratorCong / this.denominatorCong;
- }
- /**
- * Returns the weighted free flow mean of available data.
- * @return double; weighted free flow mean of available data
- */
- public double getFree()
- {
- return this.numeratorFree / this.denominatorFree;
- }
- /**
- * Returns the sum of congestion weights.
- * @return double; the sum of congestion weights
- */
- public double getDenominatorCong()
- {
- return this.denominatorCong;
- }
- /**
- * Returns the sum of free flow weights.
- * @return double; the sum of free flow weights
- */
- public double getDenominatorFree()
- {
- return this.denominatorFree;
- }
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return "DualWeightedMean [numeratorCong=" + this.numeratorCong + ", numeratorFree=" + this.numeratorFree
- + ", denominatorCong=" + this.denominatorCong + ", denominatorFree=" + this.denominatorFree + "]";
- }
- }
- /**
- * Small class to build up a weighted mean.
- */
- private class WeightedMean
- {
- /** Cumulative numerator of weighted mean fraction, i.e. weighted sum. */
- private double numerator;
- /** Cumulative denominator of weighted mean fraction, i.e. sum of weights. */
- private double denominator;
- /**
- * Adds a value with weight.
- * @param value double; value
- * @param weight double; weight
- */
- public void add(final double value, final double weight)
- {
- this.numerator += value * weight;
- this.denominator += weight;
- }
- /**
- * Returns the weighted mean of available data.
- * @return double; weighted mean of available data
- */
- public double get()
- {
- return this.numerator / this.denominator;
- }
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return "WeightedMean [numerator=" + this.numerator + ", denominator=" + this.denominator + "]";
- }
- }
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return "EGTF [kernel=" + this.kernel + ", cCong=" + this.cCong + ", cFree=" + this.cFree + ", deltaV=" + this.deltaV
- + ", vc=" + this.vc + ", dataSources=" + this.dataSources + ", data=" + this.data + ", interrupted="
- + this.interrupted + ", listeners=" + this.listeners + "]";
- }
- }