View Javadoc
1   package org.opentrafficsim.core.egtf;
2   
3   import java.util.LinkedHashMap;
4   import java.util.LinkedHashSet;
5   import java.util.Map;
6   import java.util.NavigableMap;
7   import java.util.Objects;
8   import java.util.Set;
9   import java.util.SortedMap;
10  import java.util.TreeMap;
11  import java.util.stream.IntStream;
12  
13  /**
14   * Extended Generalized Treiber-Helbing Filter (van Lint and Hoogendoorn, 2009). This is an extension of the Adaptive Smoothing
15   * Method (Treiber and Helbing, 2002). A fast filter for equidistant grids (Schreiter et al., 2010) is available. This fast
16   * implementation also supports multiple data sources.
17   * <p>
18   * To allow flexible usage the EGTF works with {@code DataSource}, {@code DataStream} and {@code Quantity}.
19   * <p>
20   * A {@code DataSource}, such as "loop detectors", "floating-car data" or "camera" is mostly an identifier, but can be requested
21   * to provide several data streams.
22   * <p>
23   * A {@code DataStream} is one {@code DataSource} supplying one {@code Quantity}. For instance "loop detectors" supplying
24   * "flow". In a {@code DataStream}, supplied by the {@code DataSource}, standard deviation of measurements in congestion and
25   * free flow are defined. These determine the reliability of the {@code Quantity} data from the given {@code DataSource}, and
26   * thus ultimately the weight of the data in the estimation of the quantity.
27   * <p>
28   * A {@code Quantity}, such as "flow" or "density" defines what is measured and what is requested as output. The output can be
29   * in typed format using a {@code Converter}. Default quantities are available under {@code SPEED_SI}, {@code FLOW_SI} and
30   * {@code DENSITY_SI}, all under {@code Quantity}.
31   * <p>
32   * Data can be added using several methods for point data, vector data (multiple independent location-time combinations) and
33   * grid data (data in a grid pattern). Data is added for a particular {@code DataStream}.
34   * <p>
35   * For simple use-cases where a single data source is used, data can be added directly with a {@code Quantity}, in which case a
36   * default {@code DataSource}, and default {@code DataStream} for each {@code Quantity} is internally used. All data should
37   * either be added using {@code Quantity}'s, or it should all be added using {@code DataSource}'s. Otherwise relative data
38   * reliability is undefined.
39   * <p>
40   * Output can be requested from the EGTF using a {@code Kernel}, a spatiotemporal pattern determining measurement weights. The
41   * {@code Kernel} defines an optional maximum spatial and temporal range for measurements to consider, and uses a {@code Shape}
42   * to determine the weight for a given distance and time from the estimated point. By default this is an exponential function. A
43   * Gaussian kernel is also available, while any other shape could be also be implemented.
44   * <p>
45   * Parameters from the EGTF are found in the following places:
46   * <ul>
47   * <li>{@code EGTF}: <i>cCong</i>, <i>cFree</i>, <i>deltaV</i> and <i>vc</i>, defining the overall traffic flow properties.</li>
48   * <li>{@code Kernel}: <i>tMax</i> and <i>xMax</i>, defining the maximum range to consider.</li>
49   * <li>{@code KernelShape}: <i>sigma</i> and <i>tau</i>, determining the decay of weights for further measurements in space and
50   * time.</li>
51   * <li>{@code DataStream}: <i>thetaCong</i> and <i>thetaFree</i>, defining the reliability by the standard deviation of measured
52   * data in free flow and congestion from a particular data stream.</li>
53   * </ul>
54   * References:
55   * <ul>
56   * <li>van Lint, J. W. C. and Hoogendoorn, S. P. (2009). A robust and efficient method for fusing heterogeneous data from
57   * traffic sensors on freeways. Computer Aided Civil and Infrastructure Engineering, accepted for publication.</li>
58   * <li>Schreiter, T., van Lint, J. W. C., Treiber, M. and Hoogendoorn, S. P. (2010). Two fast implementations of the Adaptive
59   * Smoothing Method used in highway traffic state estimation. 13th International IEEE Conference on Intelligent Transportation
60   * Systems, 19-22 Sept. 2010, Funchal, Portugal.</li>
61   * <li>Treiber, M. and Helbing, D. (2002). Reconstructing the spatio-temporal traffic dynamics from stationary detector data.
62   * Cooper@tive Tr@nsport@tion Dyn@mics, 1:3.1–3.24.</li>
63   * </ul>
64   * <p>
65   * Copyright (c) 2013-2019 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
66   * BSD-style license. See <a href="http://opentrafficsim.org/node/13">OpenTrafficSim License</a>.
67   * <p>
68   * @version $Revision$, $LastChangedDate$, by $Author$, initial version 3 okt. 2018 <br>
69   * @author <a href="http://www.transport.citg.tudelft.nl">Wouter Schakel</a>
70   */
71  public class EGTF
72  {
73  
74      /** Default sigma value. */
75      private static final double DEFAULT_SIGMA = 300.0;
76  
77      /** Default tau value. */
78      private static final double DEFAULT_TAU = 30.0;
79  
80      /** Filter kernel. */
81      private Kernel kernel;
82  
83      /** Shock wave speed in congestion. */
84      private final double cCong;
85  
86      /** Shock wave speed in free flow. */
87      private final double cFree;
88  
89      /** Speed range between congestion and free flow. */
90      private final double deltaV;
91  
92      /** Flip-over speed below which we have congestion. */
93      private final double vc;
94  
95      /** Data sources by label so we can return the same instances upon repeated request. */
96      private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
97  
98      /** Default data source for cases where a single data source is used. */
99      private DataSource defaultDataSource = null;
100 
101     /** Default data streams for cases where a single data source is used. */
102     private Map<Quantity<?, ?>, DataStream<?>> defaultDataStreams = null;
103 
104     /** True if data is currently being added using a quantity, in which case a check should not occur. */
105     private boolean addingByQuantity;
106 
107     /** All point data sorted by space and time, and per data stream. */
108     private NavigableMap<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> data = new TreeMap<>();
109 
110     /** Whether the calculation was interrupted. */
111     private boolean interrupted = false;
112 
113     /** Listeners. */
114     private Set<EgtfListener> listeners = new LinkedHashSet<>();
115 
116     /**
117      * Constructor using cCong = -18km/h, cFree = 80km/h, deltaV = 10km/h and vc = 80km/h. A default kernel is set.
118      */
119     public EGTF()
120     {
121         this(-18.0, 80.0, 10.0, 80.0);
122     }
123 
124     /**
125      * Constructor defining global settings. A default kernel is set.
126      * @param cCong double; shock wave speed in congestion [km/h]
127      * @param cFree double; shock wave speed in free flow [km/h]
128      * @param deltaV double; speed range between congestion and free flow [km/h]
129      * @param vc double; flip-over speed below which we have congestion [km/h]
130      */
131     public EGTF(final double cCong, final double cFree, final double deltaV, final double vc)
132     {
133         this.cCong = cCong / 3.6;
134         this.cFree = cFree / 3.6;
135         this.deltaV = deltaV / 3.6;
136         this.vc = vc / 3.6;
137         setKernel();
138     }
139 
140     /**
141      * Convenience constructor that also sets a specified kernel.
142      * @param cCong double; shock wave speed in congestion [km/h]
143      * @param cFree double; shock wave speed in free flow [km/h]
144      * @param deltaV double; speed range between congestion and free flow [km/h]
145      * @param vc double; flip-over speed below which we have congestion [km/h]
146      * @param sigma double; spatial kernel size in [m]
147      * @param tau double; temporal kernel size in [s]
148      * @param xMax double; maximum spatial range in [m]
149      * @param tMax double; maximum temporal range in [s]
150      */
151     @SuppressWarnings("parameternumber")
152     public EGTF(final double cCong, final double cFree, final double deltaV, final double vc, final double sigma,
153             final double tau, final double xMax, final double tMax)
154     {
155         this(cCong, cFree, deltaV, vc);
156         setKernelSI(sigma, tau, xMax, tMax);
157     }
158 
159     // ********************
160     // *** DATA METHODS ***
161     // ********************
162 
163     /**
164      * Return a data source, which is created if necessary.
165      * @param name String; unique name for the data source
166      * @return DataSource; data source
167      * @throws IllegalStateException when data has been added without a data source
168      */
169     public DataSource getDataSource(final String name)
170     {
171         if (this.defaultDataSource != null)
172         {
173             throw new IllegalStateException(
174                     "Obtaining a (new) data source after data has been added without a data source is not allowed.");
175         }
176         return this.dataSources.computeIfAbsent(name, (key) -> new DataSource(key));
177     }
178 
179     /**
180      * Removes all data from before the given time. This is useful in live usages of this class, where older data is no longer
181      * required.
182      * @param time double; time before which all data can be removed
183      */
184     public synchronized void clearDataBefore(final double time)
185     {
186         for (SortedMap<Double, Map<DataStream<?>, Double>> map : this.data.values())
187         {
188             map.subMap(Double.NEGATIVE_INFINITY, time).clear();
189         }
190     }
191 
192     /**
193      * Adds point data.
194      * @param quantity Quantity&lt;?, ?&gt;; quantity of the data
195      * @param location double; location in [m]
196      * @param time double; time in [s]
197      * @param value double; data value
198      * @throws IllegalStateException if data was added with a data stream previously
199      */
200     public synchronized void addPointDataSI(final Quantity<?, ?> quantity, final double location, final double time,
201             final double value)
202     {
203         this.addingByQuantity = true;
204         addPointDataSI(getDefaultDataStream(quantity), location, time, value);
205         this.addingByQuantity = false;
206     }
207 
208     /**
209      * Adds point data.
210      * @param dataStream DataStream&lt;?&gt;; data stream of the data
211      * @param location double; location in [m]
212      * @param time double; time in [s]
213      * @param value double; data value
214      * @throws IllegalStateException if data was added with a quantity previously
215      */
216     public synchronized void addPointDataSI(final DataStream<?> dataStream, final double location, final double time,
217             final double value)
218     {
219         checkNoQuantityData();
220         Objects.requireNonNull(dataStream, "Datastream may not be null.");
221         if (!Double.isNaN(value))
222         {
223             getSpacioTemporalData(getSpatialData(location), time).put(dataStream, value);
224         }
225     }
226 
227     /**
228      * Adds vector data.
229      * @param quantity Quantity&lt;?, ?&gt;; quantity of the data
230      * @param location double[]; locations in [m]
231      * @param time double[]; times in [s]
232      * @param values double[]; data values in SI unit
233      * @throws IllegalStateException if data was added with a data stream previously
234      */
235     public synchronized void addVectorDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
236             final double[] values)
237     {
238         this.addingByQuantity = true;
239         addVectorDataSI(getDefaultDataStream(quantity), location, time, values);
240         this.addingByQuantity = false;
241     }
242 
243     /**
244      * Adds vector data.
245      * @param dataStream DataStream&lt;?&gt;; data stream of the data
246      * @param location double[]; locations in [m]
247      * @param time double[]; times in [s]
248      * @param values double[]; data values in SI unit
249      * @throws IllegalStateException if data was added with a quantity previously
250      */
251     public synchronized void addVectorDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
252             final double[] values)
253     {
254         checkNoQuantityData();
255         Objects.requireNonNull(dataStream, "Datastream may not be null.");
256         Objects.requireNonNull(location, "Location may not be null.");
257         Objects.requireNonNull(time, "Time may not be null.");
258         Objects.requireNonNull(values, "Values may not be null.");
259         if (location.length != time.length || time.length != values.length)
260         {
261             throw new IllegalArgumentException(String.format("Unequal lengths: location %d, time %d, data %d.", location.length,
262                     time.length, values.length));
263         }
264         for (int i = 0; i < values.length; i++)
265         {
266             if (!Double.isNaN(values[i]))
267             {
268                 getSpacioTemporalData(getSpatialData(location[i]), time[i]).put(dataStream, values[i]);
269             }
270         }
271     }
272 
273     /**
274      * Adds grid data.
275      * @param quantity Quantity&lt;?, ?&gt;; quantity of the data
276      * @param location double[]; locations in [m]
277      * @param time double[]; times in [s]
278      * @param values double[][]; data values in SI unit
279      * @throws IllegalStateException if data was added with a data stream previously
280      */
281     public synchronized void addGridDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
282             final double[][] values)
283     {
284         this.addingByQuantity = true;
285         addGridDataSI(getDefaultDataStream(quantity), location, time, values);
286         this.addingByQuantity = false;
287     }
288 
289     /**
290      * Adds grid data.
291      * @param dataStream DataStream&lt;?&gt;; data stream of the data
292      * @param location double[]; locations in [m]
293      * @param time double[]; times in [s]
294      * @param values double[][]; data values in SI unit
295      * @throws IllegalStateException if data was added with a quantity previously
296      */
297     public synchronized void addGridDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
298             final double[][] values)
299     {
300         checkNoQuantityData();
301         Objects.requireNonNull(dataStream, "Datastream may not be null.");
302         Objects.requireNonNull(location, "Location may not be null.");
303         Objects.requireNonNull(time, "Time may not be null.");
304         Objects.requireNonNull(values, "Values may not be null.");
305         if (values.length != location.length)
306         {
307             throw new IllegalArgumentException(
308                     String.format("%d locations while length of data is %d", location.length, values.length));
309         }
310         for (int i = 0; i < location.length; i++)
311         {
312             if (values[i].length != time.length)
313             {
314                 throw new IllegalArgumentException(
315                         String.format("%d times while length of data is %d", time.length, values[i].length));
316             }
317             Map<Double, Map<DataStream<?>, Double>> spatialData = getSpatialData(location[i]);
318             for (int j = 0; j < time.length; j++)
319             {
320                 if (!Double.isNaN(values[i][j]))
321                 {
322                     getSpacioTemporalData(spatialData, time[j]).put(dataStream, values[i][j]);
323                 }
324             }
325         }
326     }
327 
328     /**
329      * Check that no data was added using a quantity.
330      * @throws IllegalStateException if data was added with a quantity previously
331      */
332     private void checkNoQuantityData()
333     {
334         if (!this.addingByQuantity && this.defaultDataSource != null)
335         {
336             throw new IllegalStateException(
337                     "Adding data with a data stream is not allowed after data has been added with a quantity.");
338         }
339     }
340 
341     /**
342      * Returns a default data stream and checks that no data with a data stream was added.
343      * @param quantity Quantity&lt;?, ?&gt;; quantity
344      * @return DataStream&lt;?&gt;; default data stream
345      * @throws IllegalStateException if data was added with a data stream previously
346      */
347     private DataStream<?> getDefaultDataStream(final Quantity<?, ?> quantity)
348     {
349         Objects.requireNonNull(quantity, "Quantity may not be null.");
350         if (!this.dataSources.isEmpty())
351         {
352             throw new IllegalStateException(
353                     "Adding data with a quantity is not allowed after data has been added with a data stream.");
354         }
355         if (this.defaultDataSource == null)
356         {
357             this.defaultDataSource = new DataSource("default");
358             this.defaultDataStreams = new LinkedHashMap<>();
359         }
360         return this.defaultDataStreams.computeIfAbsent(quantity,
361                 (key) -> this.defaultDataSource.addStreamSI(quantity, 1.0, 1.0));
362     }
363 
364     /**
365      * Returns data from a specific location as a subset from all data. An empty map is returned if no such data exists.
366      * @param location double; location in [m]
367      * @return data from a specific location
368      */
369     private SortedMap<Double, Map<DataStream<?>, Double>> getSpatialData(final double location)
370     {
371         return this.data.computeIfAbsent(location, (key) -> new TreeMap<>());
372     }
373 
374     /**
375      * Returns data from a specific time as a subset of data from a specific location. An empty map is returned if no such data
376      * exists.
377      * @param spatialData Map&lt;Double, Map&lt;DataStream&lt;?&gt;, Double&gt;&gt;; spatially selected data
378      * @param time double; time in [s]
379      * @return data from a specific time, from data from a specific location
380      */
381     private Map<DataStream<?>, Double> getSpacioTemporalData(final Map<Double, Map<DataStream<?>, Double>> spatialData,
382             final double time)
383     {
384         return spatialData.computeIfAbsent(time, (key) -> new LinkedHashMap<>());
385     }
386 
387     // **********************
388     // *** KERNEL METHODS ***
389     // **********************
390 
391     /**
392      * Sets a default exponential kernel with infinite range, sigma = 300m, and tau = 30s.
393      */
394     public void setKernel()
395     {
396         setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, new ExpKernelShape(DEFAULT_SIGMA, DEFAULT_TAU));
397     }
398 
399     /**
400      * Sets an exponential kernel with infinite range.
401      * @param sigma double; spatial kernel size
402      * @param tau double; temporal kernel size
403      */
404     public void setKernelSI(final double sigma, final double tau)
405     {
406         setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
407     }
408 
409     /**
410      * Sets an exponential kernel with limited range.
411      * @param sigma double; spatial kernel size in [m]
412      * @param tau double; temporal kernel size in [s]
413      * @param xMax double; maximum spatial range in [m]
414      * @param tMax double; maximum temporal range in [s]
415      */
416     public void setKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
417     {
418         setKernelSI(xMax, tMax, new ExpKernelShape(sigma, tau));
419     }
420 
421     /**
422      * Sets a Gaussian kernel with infinite range.
423      * @param sigma double; spatial kernel size
424      * @param tau double; temporal kernel size
425      */
426     public void setGaussKernelSI(final double sigma, final double tau)
427     {
428         setGaussKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
429     }
430 
431     /**
432      * Sets a Gaussian kernel with limited range.
433      * @param sigma double; spatial kernel size in [m]
434      * @param tau double; temporal kernel size in [s]
435      * @param xMax double; maximum spatial range in [m]
436      * @param tMax double; maximum temporal range in [s]
437      */
438     public void setGaussKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
439     {
440         setKernelSI(xMax, tMax, new GaussKernelShape(sigma, tau));
441     }
442 
443     /**
444      * Sets a kernel with limited range and provided shape. The shape allows using non-exponential kernels.
445      * @param xMax double; maximum spatial range
446      * @param tMax double; maximum temporal range
447      * @param shape KernelShape; shape of the kernel
448      */
449     public synchronized void setKernelSI(final double xMax, final double tMax, final KernelShape shape)
450     {
451         this.kernel = new Kernel(xMax, tMax, shape);
452     }
453 
454     /**
455      * Returns the wave speed in congestion.
456      * @return double; wave speed in congestion
457      */
458     final double getWaveSpeedCongestion()
459     {
460         return this.cCong;
461     }
462 
463     /**
464      * Returns the wave speed in free flow.
465      * @return double; wave speed in free flow
466      */
467     final double getWaveSpeedFreeFlow()
468     {
469         return this.cFree;
470     }
471 
472     // **********************
473     // *** FILTER METHODS ***
474     // **********************
475 
476     /**
477      * Executes filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
478      * done. Finally, the filtering results can then be obtained from the listener.
479      * @param location double[]; location of output grid in [m]
480      * @param time double[]; time of output grid in [s]
481      * @param quantities Quantity&lt;?, ?&gt;...; quantities to calculate filtered data of
482      * @return EgtfParallelListener; listener to notify keep track of the progress
483      */
484     public EgtfParallelListener filterParallelSI(final double[] location, final double[] time,
485             final Quantity<?, ?>... quantities)
486     {
487         Objects.requireNonNull(location, "Location may not be null.");
488         Objects.requireNonNull(time, "Time may not be null.");
489         EgtfParallelListener listener = new EgtfParallelListener();
490         addListener(listener);
491         new Thread(new Runnable()
492         {
493             /** {@inheritDoc} */
494             @Override
495             public void run()
496             {
497                 listener.setFilter(filterSI(location, time, quantities));
498                 removeListener(listener);
499             }
500         }, "Egtf calculation thread").start();
501         return listener;
502     }
503 
504     /**
505      * Executes fast filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
506      * done. Finally, the filtering results can then be obtained from the listener.
507      * @param xMin double; minimum location value of output grid [m]
508      * @param xStep double; location step of output grid [m]
509      * @param xMax double; maximum location value of output grid [m]
510      * @param tMin double; minimum time value of output grid [s]
511      * @param tStep double; time step of output grid [s]
512      * @param tMax double; maximum time value of output grid [s]
513      * @param quantities Quantity&lt;?, ?&gt;...; quantities to calculate filtered data of
514      * @return EgtfParallelListener; listener to notify keep track of the progress
515      */
516     public EgtfParallelListener filterParallelFastSI(final double xMin, final double xStep, final double xMax,
517             final double tMin, final double tStep, final double tMax, final Quantity<?, ?>... quantities)
518     {
519         EgtfParallelListener listener = new EgtfParallelListener();
520         addListener(listener);
521         new Thread(new Runnable()
522         {
523             /** {@inheritDoc} */
524             @Override
525             public void run()
526             {
527                 listener.setFilter(filterFastSI(xMin, xStep, xMax, tMin, tStep, tMax, quantities));
528                 removeListener(listener);
529             }
530         }, "Egtf calculation thread").start();
531         return listener;
532     }
533 
534     /**
535      * Returns filtered data. This is the standard EGTF implementation.
536      * @param location double[]; location of output grid in [m]
537      * @param time double[]; time of output grid in [s]
538      * @param quantities Quantity&lt;?, ?&gt;...; quantities to calculate filtered data of
539      * @return Filter; filtered data, {@code null} when interrupted
540      */
541     @SuppressWarnings({ "synthetic-access", "methodlength" })
542     public Filter filterSI(final double[] location, final double[] time, final Quantity<?, ?>... quantities)
543     {
544         Objects.requireNonNull(location, "Location may not be null.");
545         Objects.requireNonNull(time, "Time may not be null.");
546 
547         // initialize data
548         Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
549         for (Quantity<?, ?> quantity : quantities)
550         {
551             map.put(quantity, new double[location.length][time.length]);
552         }
553 
554         // loop grid locations
555         for (int i = 0; i < location.length; i++)
556         {
557             double xGrid = location[i];
558 
559             // filter applicable data for location
560             Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
561                     this.data.subMap(this.kernel.fromLocation(xGrid), true, this.kernel.toLocation(xGrid), true);
562 
563             // loop grid times
564             for (int j = 0; j < time.length; j++)
565             {
566                 double tGrid = time[j];
567 
568                 // notify
569                 if (notifyListeners((i + (double) j / time.length) / location.length))
570                 {
571                     return null;
572                 }
573 
574                 // initialize data per stream
575                 // quantity z assuming congestion and free flow
576                 Map<DataStream<?>, DualWeightedMean> zCongFree = new LinkedHashMap<>();
577 
578                 // filter and loop applicable data for time
579                 for (Map.Entry<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> xEntry : spatialData.entrySet())
580                 {
581                     double dx = xEntry.getKey() - xGrid;
582                     Map<Double, Map<DataStream<?>, Double>> temporalData =
583                             xEntry.getValue().subMap(this.kernel.fromTime(tGrid), true, this.kernel.toTime(tGrid), true);
584 
585                     for (Map.Entry<Double, Map<DataStream<?>, Double>> tEntry : temporalData.entrySet())
586                     {
587                         double dt = tEntry.getKey() - tGrid;
588                         Map<DataStream<?>, Double> pData = tEntry.getValue();
589 
590                         double phiCong = this.kernel.weight(this.cCong, dx, dt);
591                         double phiFree = this.kernel.weight(this.cFree, dx, dt);
592 
593                         // loop streams data at point
594                         for (Map.Entry<DataStream<?>, Double> vEntry : pData.entrySet())
595                         {
596                             DataStream<?> stream = vEntry.getKey();
597                             if (map.containsKey(stream.getQuantity()) || stream.getQuantity().isSpeed())
598                             {
599                                 double v = vEntry.getValue();
600                                 DualWeightedMean zCongFreeOfStream =
601                                         zCongFree.computeIfAbsent(stream, (key) -> new DualWeightedMean());
602                                 zCongFreeOfStream.addCong(v, phiCong);
603                                 zCongFreeOfStream.addFree(v, phiFree);
604                             }
605                         }
606                     }
607                 }
608 
609                 // figure out the congestion level estimated for each data source
610                 Map<DataSource, Double> w = new LinkedHashMap<>();
611                 for (Map.Entry<DataStream<?>, DualWeightedMean> streamEntry : zCongFree.entrySet())
612                 {
613                     DataStream<?> dataStream = streamEntry.getKey();
614                     if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
615                     {
616                         DualWeightedMean zCongFreeOfStream = streamEntry.getValue();
617                         double u = Math.min(zCongFreeOfStream.getCong(), zCongFreeOfStream.getFree());
618                         w.put(dataStream.getDataSource(), // 1 speed quantity per source allowed
619                                 .5 * (1.0 + Math.tanh((EGTF.this.vc - u) / EGTF.this.deltaV)));
620                         continue;
621                     }
622                 }
623 
624                 // sum available data sources per quantity
625                 Double wMean = null;
626                 for (Map.Entry<Quantity<?, ?>, double[][]> qEntry : map.entrySet())
627                 {
628                     Quantity<?, ?> quantity = qEntry.getKey();
629                     WeightedMean z = new WeightedMean();
630                     for (Map.Entry<DataStream<?>, DualWeightedMean> zEntry : zCongFree.entrySet())
631                     {
632                         DataStream<?> dataStream = zEntry.getKey();
633                         if (dataStream.getQuantity().equals(quantity))
634                         {
635                             // obtain congestion level
636                             double wCong;
637                             if (!w.containsKey(dataStream.getDataSource()))
638                             {
639                                 // this data source has no speed data, but congestion level can be estimated from other sources
640                                 if (wMean == null)
641                                 {
642                                     // let's see if speed was estimated already
643                                     for (Quantity<?, ?> prevQuant : quantities)
644                                     {
645                                         if (prevQuant.equals(quantity))
646                                         {
647                                             // it was not, get mean of other data source
648                                             wMean = 0.0;
649                                             for (double ww : w.values())
650                                             {
651                                                 wMean += ww / w.size();
652                                             }
653                                             break;
654                                         }
655                                         else if (prevQuant.isSpeed())
656                                         {
657                                             wMean = .5 * (1.0
658                                                     + Math.tanh((EGTF.this.vc - map.get(prevQuant)[i][j]) / EGTF.this.deltaV));
659                                             break;
660                                         }
661                                     }
662                                 }
663                                 wCong = wMean;
664                             }
665                             else
666                             {
667                                 wCong = w.get(dataStream.getDataSource());
668                             }
669                             // calculate estimated value z of this data source (no duplicate quantities per source allowed)
670                             double wfree = 1.0 - wCong;
671                             DualWeightedMean zCongFreej = zEntry.getValue();
672                             double zStream = wCong * zCongFreej.getCong() + wfree * zCongFreej.getFree();
673                             double weight;
674                             if (w.size() > 1)
675                             {
676                                 // data source more important if more and nearer measurements
677                                 double beta = wCong * zCongFreej.getDenominatorCong() + wfree * zCongFreej.getDenominatorFree();
678                                 // more important if more reliable (smaller standard deviation) at congestion level
679                                 double alpha = wCong / dataStream.getThetaCong() + wfree / dataStream.getThetaFree();
680                                 weight = alpha * beta;
681                             }
682                             else
683                             {
684                                 weight = 1.0;
685                             }
686                             z.add(zStream, weight);
687                         }
688                     }
689                     qEntry.getValue()[i][j] = z.get();
690                 }
691             }
692         }
693         notifyListeners(1.0);
694 
695         return new FilterDouble(location, time, map);
696     }
697 
698     /**
699      * Returns filtered data that is processed using fast fourier transformation. This is much faster than the standard filter,
700      * at the cost that all input data is discretized to the output grid. The gain in computation speed is however such that
701      * finer output grids can be used to alleviate this. For discretization the output grid needs to be equidistant. It is
702      * recommended to set a Kernel with maximum bounds before using this method.
703      * <p>
704      * More than being a fast implementation of the Adaptive Smoothing Method, this implementation includes all data source like
705      * the Extended Generalized Treiber-Helbing Filter.
706      * @param xMin double; minimum location value of output grid [m]
707      * @param xStep double; location step of output grid [m]
708      * @param xMax double; maximum location value of output grid [m]
709      * @param tMin double; minimum time value of output grid [s]
710      * @param tStep double; time step of output grid [s]
711      * @param tMax double; maximum time value of output grid [s]
712      * @param quantities Quantity&lt;?, ?&gt;...; quantities to calculate filtered data of
713      * @return Filter; filtered data, {@code null} when interrupted
714      */
715     @SuppressWarnings("methodlength")
716     public Filter filterFastSI(final double xMin, final double xStep, final double xMax, final double tMin, final double tStep,
717             final double tMax, final Quantity<?, ?>... quantities)
718     {
719         if (xMin > xMax || xStep <= 0.0 || tMin > tMax || tStep <= 0.0)
720         {
721             throw new IllegalArgumentException(
722                     "Ill-defined grid. Make sure that xMax >= xMin, dx > 0, tMax >= tMin and dt > 0");
723         }
724         if (notifyListeners(0.0))
725         {
726             return null;
727         }
728 
729         // initialize data
730         int n = 1 + (int) ((xMax - xMin) / xStep);
731         double[] location = new double[n];
732         IntStream.range(0, n).forEach(i -> location[i] = xMin + i * xStep);
733         n = 1 + (int) ((tMax - tMin) / tStep);
734         double[] time = new double[n];
735         IntStream.range(0, n).forEach(j -> time[j] = tMin + j * tStep);
736         Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
737         Map<Quantity<?, ?>, double[][]> weights = new LinkedHashMap<>();
738         for (Quantity<?, ?> quantity : quantities)
739         {
740             map.put(quantity, new double[location.length][time.length]);
741             weights.put(quantity, new double[location.length][time.length]);
742         }
743 
744         // discretize Kernel
745         double xFrom = this.kernel.fromLocation(0.0);
746         xFrom = Double.isInfinite(xFrom) ? 2.0 * (xMin - xMax) : xFrom;
747         double xTo = this.kernel.toLocation(0.0);
748         xTo = Double.isInfinite(xTo) ? 2.0 * (xMax - xMin) : xTo;
749         double[] dx = equidistant(xFrom, xStep, xTo);
750         double tFrom = this.kernel.fromTime(0.0);
751         tFrom = Double.isInfinite(tFrom) ? 2.0 * (tMin - tMax) : tFrom;
752         double tTo = this.kernel.toTime(0.0);
753         tTo = Double.isInfinite(tTo) ? 2.0 * (tMax - tMin) : tTo;
754         double[] dt = equidistant(tFrom, tStep, tTo);
755         double[][] phiCong = new double[dx.length][dt.length];
756         double[][] phiFree = new double[dx.length][dt.length];
757         for (int i = 0; i < dx.length; i++)
758         {
759             for (int j = 0; j < dt.length; j++)
760             {
761                 phiCong[i][j] = this.kernel.weight(this.cCong, dx[i], dt[j]);
762                 phiFree[i][j] = this.kernel.weight(this.cFree, dx[i], dt[j]);
763             }
764         }
765 
766         // discretize data
767         Map<DataStream<?>, double[][]> dataSum = new LinkedHashMap<>();
768         Map<DataStream<?>, double[][]> dataCount = new LinkedHashMap<>(); // integer counts, must be double[][] for convolution
769         // loop grid locations
770         for (int i = 0; i < location.length; i++)
771         {
772             // filter applicable data for location
773             Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
774                     this.data.subMap(location[i] - 0.5 * xStep, true, location[i] + 0.5 * xStep, true);
775             // loop grid times
776             for (int j = 0; j < time.length; j++)
777             {
778                 // filter and loop applicable data for time
779                 for (NavigableMap<Double, Map<DataStream<?>, Double>> locationData : spatialData.values())
780                 {
781                     NavigableMap<Double, Map<DataStream<?>, Double>> temporalData =
782                             locationData.subMap(time[j] - 0.5 * tStep, true, time[j] + 0.5 * tStep, true);
783                     for (Map<DataStream<?>, Double> timeData : temporalData.values())
784                     {
785                         for (Map.Entry<DataStream<?>, Double> timeEntry : timeData.entrySet())
786                         {
787                             if (map.containsKey(timeEntry.getKey().getQuantity()) || timeEntry.getKey().getQuantity().isSpeed())
788                             {
789                                 dataSum.computeIfAbsent(timeEntry.getKey(),
790                                         (key) -> new double[location.length][time.length])[i][j] += timeEntry.getValue();
791                                 dataCount.computeIfAbsent(timeEntry.getKey(),
792                                         (key) -> new double[location.length][time.length])[i][j]++;
793                             }
794                         }
795                     }
796                 }
797             }
798         }
799 
800         // figure out the congestion level estimated for each data source
801         double steps = quantities.length + 1; // speed (for congestion level) and then all in quantities
802         double step = 0;
803         // store maps to prevent us from calculating the convolution for speed again later
804         Map<DataSource, double[][]> w = new LinkedHashMap<>();
805         Map<DataSource, double[][]> zCongSpeed = new LinkedHashMap<>();
806         Map<DataSource, double[][]> zFreeSpeed = new LinkedHashMap<>();
807         Map<DataSource, double[][]> nCongSpeed = new LinkedHashMap<>();
808         Map<DataSource, double[][]> nFreeSpeed = new LinkedHashMap<>();
809         for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
810         {
811             DataStream<?> dataStream = zEntry.getKey();
812             if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
813             {
814                 // notify
815                 double[][] vCong = Convolution.convolution(phiCong, zEntry.getValue());
816                 if (notifyListeners((step + 0.25) / steps))
817                 {
818                     return null;
819                 }
820                 double[][] vFree = Convolution.convolution(phiFree, zEntry.getValue());
821                 if (notifyListeners((step + 0.5) / steps))
822                 {
823                     return null;
824                 }
825                 double[][] count = dataCount.get(dataStream);
826                 double[][] nCong = Convolution.convolution(phiCong, count);
827                 if (notifyListeners((step + 0.75) / steps))
828                 {
829                     return null;
830                 }
831                 double[][] nFree = Convolution.convolution(phiFree, count);
832                 double[][] wSource = new double[vCong.length][vCong[0].length];
833                 for (int i = 0; i < vCong.length; i++)
834                 {
835                     for (int j = 0; j < vCong[0].length; j++)
836                     {
837                         double u = Math.min(vCong[i][j] / nCong[i][j], vFree[i][j] / nFree[i][j]);
838                         wSource[i][j] = .5 * (1.0 + Math.tanh((EGTF.this.vc - u) / EGTF.this.deltaV));
839                     }
840                 }
841                 w.put(dataStream.getDataSource(), wSource);
842                 zCongSpeed.put(dataStream.getDataSource(), vCong);
843                 zFreeSpeed.put(dataStream.getDataSource(), vFree);
844                 nCongSpeed.put(dataStream.getDataSource(), nCong);
845                 nFreeSpeed.put(dataStream.getDataSource(), nFree);
846             }
847         }
848         step++;
849         if (notifyListeners(step / steps))
850         {
851             return null;
852         }
853 
854         // sum available data sources per quantity
855         double[][] wMean = null;
856         for (Quantity<?, ?> quantity : quantities)
857         {
858             // gather place for this quantity
859             double[][] qData = map.get(quantity);
860             double[][] qWeights = weights.get(quantity);
861             // loop streams that provide this quantity
862             Set<Map.Entry<DataStream<?>, double[][]>> zEntries = new LinkedHashSet<>();
863             for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
864             {
865                 if (zEntry.getKey().getQuantity().equals(quantity))
866                 {
867                     zEntries.add(zEntry);
868                 }
869             }
870             double streamCounter = 0;
871             for (Map.Entry<DataStream<?>, double[][]> zEntry : zEntries)
872             {
873                 DataStream<?> dataStream = zEntry.getKey();
874 
875                 // obtain congestion level
876                 double[][] wj;
877                 if (!w.containsKey(dataStream.getDataSource()))
878                 {
879                     // this data source has no speed data, but congestion level can be estimated from other sources
880                     if (wMean == null)
881                     {
882                         // let's see if speed was estimated already
883                         for (Quantity<?, ?> prevQuant : quantities)
884                         {
885                             if (prevQuant.equals(quantity))
886                             {
887                                 // it was not, get mean of other data source
888                                 wMean = new double[location.length][time.length];
889                                 for (double[][] ww : w.values())
890                                 {
891                                     for (int i = 0; i < location.length; i++)
892                                     {
893                                         for (int j = 0; j < time.length; j++)
894                                         {
895                                             wMean[i][j] += ww[i][j] / w.size();
896                                         }
897                                     }
898                                 }
899                                 break;
900                             }
901                             else if (prevQuant.isSpeed())
902                             {
903                                 wMean = new double[location.length][time.length];
904                                 double[][] v = map.get(prevQuant);
905                                 for (int i = 0; i < location.length; i++)
906                                 {
907                                     for (int j = 0; j < time.length; j++)
908                                     {
909                                         wMean[i][j] = .5 * (1.0 + Math.tanh((EGTF.this.vc - v[i][j]) / EGTF.this.deltaV));
910                                     }
911                                 }
912                                 break;
913                             }
914                         }
915                     }
916                     wj = wMean;
917                 }
918                 else
919                 {
920                     wj = w.get(dataStream.getDataSource());
921                 }
922 
923                 // convolutions of filters with discretized data and data counts
924                 double[][] zCong;
925                 double[][] zFree;
926                 double[][] nCong;
927                 double[][] nFree;
928                 if (dataStream.getQuantity().isSpeed())
929                 {
930                     zCong = zCongSpeed.get(dataStream.getDataSource());
931                     zFree = zFreeSpeed.get(dataStream.getDataSource());
932                     nCong = nCongSpeed.get(dataStream.getDataSource());
933                     nFree = nFreeSpeed.get(dataStream.getDataSource());
934                 }
935                 else
936                 {
937                     zCong = Convolution.convolution(phiCong, zEntry.getValue());
938                     if (notifyListeners((step + (streamCounter + 0.25) / zEntries.size()) / steps))
939                     {
940                         return null;
941                     }
942                     zFree = Convolution.convolution(phiFree, zEntry.getValue());
943                     if (notifyListeners((step + (streamCounter + 0.5) / zEntries.size()) / steps))
944                     {
945                         return null;
946                     }
947                     double[][] count = dataCount.get(dataStream);
948                     nCong = Convolution.convolution(phiCong, count);
949                     if (notifyListeners((step + (streamCounter + 0.75) / zEntries.size()) / steps))
950                     {
951                         return null;
952                     }
953                     nFree = Convolution.convolution(phiFree, count);
954                 }
955 
956                 // loop grid to add to each weighted sum (weighted per data source)
957                 for (int i = 0; i < location.length; i++)
958                 {
959                     for (int j = 0; j < time.length; j++)
960                     {
961                         double wCong = wj[i][j];
962                         double wFree = 1.0 - wCong;
963                         double value = wCong * zCong[i][j] / nCong[i][j] + wFree * zFree[i][j] / nFree[i][j];
964                         // the fast filter supplies convoluted data counts, i.e. amount of data and filter proximity; this
965                         // is exactly what the EGTF method needs to weigh data sources
966                         double beta = wCong * nCong[i][j] + wFree * nFree[i][j];
967                         double alpha = wCong / dataStream.getThetaCong() + wFree / dataStream.getThetaFree();
968                         double weight = beta * alpha;
969                         qData[i][j] += (value * weight);
970                         qWeights[i][j] += weight;
971                     }
972                 }
973                 streamCounter++;
974                 if (notifyListeners((step + streamCounter / zEntries.size()) / steps))
975                 {
976                     return null;
977                 }
978             }
979             for (int i = 0; i < location.length; i++)
980             {
981                 for (int j = 0; j < time.length; j++)
982                 {
983                     qData[i][j] /= qWeights[i][j];
984                 }
985             }
986             step++;
987         }
988 
989         return new FilterDouble(location, time, map);
990     }
991 
992     /**
993      * Returns an equidistant vector that includes 0.
994      * @param from double; lowest value to include
995      * @param step double; step
996      * @param to double; highest value to include
997      * @return double[]; equidistant vector that includes 0
998      */
999     private double[] equidistant(final double from, final double step, final double to)
1000     {
1001         int n1 = (int) (-from / step);
1002         int n2 = (int) (to / step);
1003         int n = n1 + n2 + 1;
1004         double[] array = new double[n];
1005         for (int i = 0; i < n; i++)
1006         {
1007             array[i] = i < n1 ? step * (-n1 + i) : step * (i - n1);
1008         }
1009         return array;
1010     }
1011 
1012     // *********************
1013     // *** EVENT METHODS ***
1014     // *********************
1015 
1016     /**
1017      * Interrupt the calculation.
1018      */
1019     public final void interrupt()
1020     {
1021         this.interrupted = true;
1022     }
1023 
1024     /**
1025      * Add listener.
1026      * @param listener EgtfListener; listener
1027      */
1028     public final void addListener(final EgtfListener listener)
1029     {
1030         this.listeners.add(listener);
1031     }
1032 
1033     /**
1034      * Remove listener.
1035      * @param listener EgtfListener; listener
1036      */
1037     public final void removeListener(final EgtfListener listener)
1038     {
1039         this.listeners.remove(listener);
1040     }
1041 
1042     /**
1043      * Notify all listeners.
1044      * @param progress double; progress, a value in the range [0 ... 1]
1045      * @return boolean; whether the filter is interrupted
1046      */
1047     private boolean notifyListeners(final double progress)
1048     {
1049         if (!this.listeners.isEmpty())
1050         {
1051             EgtfEvent event = new EgtfEvent(this, progress);
1052             for (EgtfListener listener : this.listeners)
1053             {
1054                 listener.notifyProgress(event);
1055             }
1056         }
1057         return this.interrupted;
1058     }
1059 
1060     // **********************
1061     // *** HELPER CLASSES ***
1062     // **********************
1063 
1064     /**
1065      * Small class to build up a weighted mean under the congestion and free flow assumption.
1066      */
1067     private class DualWeightedMean
1068     {
1069         /** Cumulative congestion numerator of weighted mean fraction, i.e. weighted sum. */
1070         private double numeratorCong;
1071 
1072         /** Cumulative free flow numerator of weighted mean fraction, i.e. weighted sum. */
1073         private double numeratorFree;
1074 
1075         /** Cumulative congestion denominator of weighted mean fraction, i.e. sum of weights. */
1076         private double denominatorCong;
1077 
1078         /** Cumulative free flow denominator of weighted mean fraction, i.e. sum of weights. */
1079         private double denominatorFree;
1080 
1081         /**
1082          * Adds a congestion value with weight.
1083          * @param value double; value
1084          * @param weight double; weight
1085          */
1086         public void addCong(final double value, final double weight)
1087         {
1088             this.numeratorCong += value * weight;
1089             this.denominatorCong += weight;
1090         }
1091 
1092         /**
1093          * Adds a free flow value with weight.
1094          * @param value double; value
1095          * @param weight double; weight
1096          */
1097         public void addFree(final double value, final double weight)
1098         {
1099             this.numeratorFree += value * weight;
1100             this.denominatorFree += weight;
1101         }
1102 
1103         /**
1104          * Returns the weighted congestion mean of available data.
1105          * @return double; weighted mean of available data
1106          */
1107         public double getCong()
1108         {
1109             return this.numeratorCong / this.denominatorCong;
1110         }
1111 
1112         /**
1113          * Returns the weighted free flow mean of available data.
1114          * @return double; weighted free flow mean of available data
1115          */
1116         public double getFree()
1117         {
1118             return this.numeratorFree / this.denominatorFree;
1119         }
1120 
1121         /**
1122          * Returns the sum of congestion weights.
1123          * @return double; the sum of congestion weights
1124          */
1125         public double getDenominatorCong()
1126         {
1127             return this.denominatorCong;
1128         }
1129 
1130         /**
1131          * Returns the sum of free flow weights.
1132          * @return double; the sum of free flow weights
1133          */
1134         public double getDenominatorFree()
1135         {
1136             return this.denominatorFree;
1137         }
1138 
1139         /** {@inheritDoc} */
1140         @Override
1141         public String toString()
1142         {
1143             return "DualWeightedMean [numeratorCong=" + this.numeratorCong + ", numeratorFree=" + this.numeratorFree
1144                     + ", denominatorCong=" + this.denominatorCong + ", denominatorFree=" + this.denominatorFree + "]";
1145         }
1146 
1147     }
1148 
1149     /**
1150      * Small class to build up a weighted mean.
1151      */
1152     private class WeightedMean
1153     {
1154         /** Cumulative numerator of weighted mean fraction, i.e. weighted sum. */
1155         private double numerator;
1156 
1157         /** Cumulative denominator of weighted mean fraction, i.e. sum of weights. */
1158         private double denominator;
1159 
1160         /**
1161          * Adds a value with weight.
1162          * @param value double; value
1163          * @param weight double; weight
1164          */
1165         public void add(final double value, final double weight)
1166         {
1167             this.numerator += value * weight;
1168             this.denominator += weight;
1169         }
1170 
1171         /**
1172          * Returns the weighted mean of available data.
1173          * @return double; weighted mean of available data
1174          */
1175         public double get()
1176         {
1177             return this.numerator / this.denominator;
1178         }
1179 
1180         /** {@inheritDoc} */
1181         @Override
1182         public String toString()
1183         {
1184             return "WeightedMean [numerator=" + this.numerator + ", denominator=" + this.denominator + "]";
1185         }
1186 
1187     }
1188 
1189     /** {@inheritDoc} */
1190     @Override
1191     public String toString()
1192     {
1193         return "EGTF [kernel=" + this.kernel + ", cCong=" + this.cCong + ", cFree=" + this.cFree + ", deltaV=" + this.deltaV
1194                 + ", vc=" + this.vc + ", dataSources=" + this.dataSources + ", data=" + this.data + ", interrupted="
1195                 + this.interrupted + ", listeners=" + this.listeners + "]";
1196     }
1197 
1198 }