View Javadoc
1   package org.opentrafficsim.draw.egtf;
2   
3   import java.util.LinkedHashMap;
4   import java.util.LinkedHashSet;
5   import java.util.Map;
6   import java.util.NavigableMap;
7   import java.util.Objects;
8   import java.util.Set;
9   import java.util.SortedMap;
10  import java.util.TreeMap;
11  import java.util.stream.IntStream;
12  
13  /**
14   * Extended Generalized Treiber-Helbing Filter (van Lint and Hoogendoorn, 2009). This is an extension of the Adaptive Smoothing
15   * Method (Treiber and Helbing, 2002). A fast filter for equidistant grids (Schreiter et al., 2010) is available. This fast
16   * implementation also supports multiple data sources.
17   * <p>
18   * To allow flexible usage the EGTF works with {@code DataSource}, {@code DataStream} and {@code Quantity}.
19   * <p>
20   * A {@code DataSource}, such as "loop detectors", "floating-car data" or "camera" is mostly an identifier, but can be requested
21   * to provide several data streams.
22   * <p>
23   * A {@code DataStream} is one {@code DataSource} supplying one {@code Quantity}. For instance "loop detectors" supplying
24   * "flow". In a {@code DataStream}, supplied by the {@code DataSource}, standard deviation of measurements in congestion and
25   * free flow are defined. These determine the reliability of the {@code Quantity} data from the given {@code DataSource}, and
26   * thus ultimately the weight of the data in the estimation of the quantity.
27   * <p>
28   * A {@code Quantity}, such as "flow" or "density" defines what is measured and what is requested as output. The output can be
29   * in typed format using a {@code Converter}. Default quantities are available under {@code SPEED_SI}, {@code FLOW_SI} and
30   * {@code DENSITY_SI}, all under {@code Quantity}.
31   * <p>
32   * Data can be added using several methods for point data, vector data (multiple independent location-time combinations) and
33   * grid data (data in a grid pattern). Data is added for a particular {@code DataStream}.
34   * <p>
35   * For simple use-cases where a single data source is used, data can be added directly with a {@code Quantity}, in which case a
36   * default {@code DataSource}, and default {@code DataStream} for each {@code Quantity} is internally used. All data should
37   * either be added using {@code Quantity}'s, or it should all be added using {@code DataSource}'s. Otherwise relative data
38   * reliability is undefined.
39   * <p>
40   * Output can be requested from the EGTF using a {@code Kernel}, a spatiotemporal pattern determining measurement weights. The
41   * {@code Kernel} defines an optional maximum spatial and temporal range for measurements to consider, and uses a {@code Shape}
42   * to determine the weight for a given distance and time from the estimated point. By default this is an exponential function. A
43   * Gaussian kernel is also available, while any other shape could be also be implemented.
44   * <p>
45   * Parameters from the EGTF are found in the following places:
46   * <ul>
47   * <li>{@code EGTF}: <i>cCong</i>, <i>cFree</i>, <i>deltaV</i> and <i>vc</i>, defining the overall traffic flow properties.</li>
48   * <li>{@code Kernel}: <i>tMax</i> and <i>xMax</i>, defining the maximum range to consider.</li>
49   * <li>{@code KernelShape}: <i>sigma</i> and <i>tau</i>, determining the decay of weights for further measurements in space and
50   * time. (Specifically {@code GaussKernelShape})</li>
51   * <li>{@code DataStream}: <i>thetaCong</i> and <i>thetaFree</i>, defining the reliability by the standard deviation of measured
52   * data in free flow and congestion from a particular data stream.</li>
53   * </ul>
54   * References:
55   * <ul>
56   * <li>van Lint, J. W. C. and Hoogendoorn, S. P. (2009). A robust and efficient method for fusing heterogeneous data from
57   * traffic sensors on freeways. Computer Aided Civil and Infrastructure Engineering, accepted for publication.</li>
58   * <li>Schreiter, T., van Lint, J. W. C., Treiber, M. and Hoogendoorn, S. P. (2010). Two fast implementations of the Adaptive
59   * Smoothing Method used in highway traffic state estimation. 13th International IEEE Conference on Intelligent Transportation
60   * Systems, 19-22 Sept. 2010, Funchal, Portugal.</li>
61   * <li>Treiber, M. and Helbing, D. (2002). Reconstructing the spatio-temporal traffic dynamics from stationary detector data.
62   * Cooper@tive Tr@nsport@tion Dyn@mics, 1:3.1–3.24.</li>
63   * </ul>
64   * <p>
65   * Copyright (c) 2013-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
66   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
67   * </p>
68   * @author <a href="https://github.com/wjschakel">Wouter Schakel</a>
69   */
70  public class Egtf
71  {
72  
73      /** Default sigma value. */
74      private static final double DEFAULT_SIGMA = 300.0;
75  
76      /** Default tau value. */
77      private static final double DEFAULT_TAU = 30.0;
78  
79      /** Filter kernel. */
80      private Kernel kernel;
81  
82      /** Shock wave speed in congestion. */
83      private final double cCong;
84  
85      /** Shock wave speed in free flow. */
86      private final double cFree;
87  
88      /** Speed range between congestion and free flow. */
89      private final double deltaV;
90  
91      /** Flip-over speed below which we have congestion. */
92      private final double vc;
93  
94      /** Data sources by label so we can return the same instances upon repeated request. */
95      private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
96  
97      /** Default data source for cases where a single data source is used. */
98      private DataSource defaultDataSource = null;
99  
100     /** Default data streams for cases where a single data source is used. */
101     private Map<Quantity<?, ?>, DataStream<?>> defaultDataStreams = null;
102 
103     /** True if data is currently being added using a quantity, in which case a check should not occur. */
104     private boolean addingByQuantity;
105 
106     /** All point data sorted by space and time, and per data stream. */
107     private NavigableMap<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> data = new TreeMap<>();
108 
109     /** Whether the calculation was interrupted. */
110     private boolean interrupted = false;
111 
112     /** Listeners. */
113     private Set<EgtfListener> listeners = new LinkedHashSet<>();
114 
115     /**
116      * Constructor using cCong = -18km/h, cFree = 80km/h, deltaV = 10km/h and vc = 80km/h. A default kernel is set.
117      */
118     public Egtf()
119     {
120         this(-18.0, 80.0, 10.0, 80.0);
121     }
122 
123     /**
124      * Constructor defining global settings. A default kernel is set.
125      * @param cCong shock wave speed in congestion [km/h]
126      * @param cFree shock wave speed in free flow [km/h]
127      * @param deltaV speed range between congestion and free flow [km/h]
128      * @param vc flip-over speed below which we have congestion [km/h]
129      */
130     public Egtf(final double cCong, final double cFree, final double deltaV, final double vc)
131     {
132         this.cCong = cCong / 3.6;
133         this.cFree = cFree / 3.6;
134         this.deltaV = deltaV / 3.6;
135         this.vc = vc / 3.6;
136         setKernel();
137     }
138 
139     /**
140      * Convenience constructor that also sets a specified kernel.
141      * @param cCong shock wave speed in congestion [km/h]
142      * @param cFree shock wave speed in free flow [km/h]
143      * @param deltaV speed range between congestion and free flow [km/h]
144      * @param vc flip-over speed below which we have congestion [km/h]
145      * @param sigma spatial kernel size in [m]
146      * @param tau temporal kernel size in [s]
147      * @param xMax maximum spatial range in [m]
148      * @param tMax maximum temporal range in [s]
149      */
150     @SuppressWarnings("parameternumber")
151     public Egtf(final double cCong, final double cFree, final double deltaV, final double vc, final double sigma,
152             final double tau, final double xMax, final double tMax)
153     {
154         this(cCong, cFree, deltaV, vc);
155         setKernelSI(sigma, tau, xMax, tMax);
156     }
157 
158     // ********************
159     // *** DATA METHODS ***
160     // ********************
161 
162     /**
163      * Return a data source, which is created if necessary.
164      * @param name unique name for the data source
165      * @return data source
166      * @throws IllegalStateException when data has been added without a data source
167      */
168     public DataSource getDataSource(final String name)
169     {
170         if (this.defaultDataSource != null)
171         {
172             throw new IllegalStateException(
173                     "Obtaining a (new) data source after data has been added without a data source is not allowed.");
174         }
175         return this.dataSources.computeIfAbsent(name, (key) -> new DataSource(key));
176     }
177 
178     /**
179      * Removes all data from before the given time. This is useful in live usages of this class, where older data is no longer
180      * required.
181      * @param time time before which all data can be removed
182      */
183     public synchronized void clearDataBefore(final double time)
184     {
185         for (SortedMap<Double, Map<DataStream<?>, Double>> map : this.data.values())
186         {
187             map.subMap(Double.NEGATIVE_INFINITY, time).clear();
188         }
189     }
190 
191     /**
192      * Adds point data.
193      * @param quantity quantity of the data
194      * @param location location in [m]
195      * @param time time in [s]
196      * @param value data value
197      * @throws IllegalStateException if data was added with a data stream previously
198      */
199     public synchronized void addPointDataSI(final Quantity<?, ?> quantity, final double location, final double time,
200             final double value)
201     {
202         this.addingByQuantity = true;
203         addPointDataSI(getDefaultDataStream(quantity), location, time, value);
204         this.addingByQuantity = false;
205     }
206 
207     /**
208      * Adds point data.
209      * @param dataStream data stream of the data
210      * @param location location in [m]
211      * @param time time in [s]
212      * @param value data value
213      * @throws IllegalStateException if data was added with a quantity previously
214      */
215     public synchronized void addPointDataSI(final DataStream<?> dataStream, final double location, final double time,
216             final double value)
217     {
218         checkNoQuantityData();
219         Objects.requireNonNull(dataStream, "Datastream may not be null.");
220         if (!Double.isNaN(value))
221         {
222             getSpacioTemporalData(getSpatialData(location), time).put(dataStream, value);
223         }
224     }
225 
226     /**
227      * Adds vector data.
228      * @param quantity quantity of the data
229      * @param location locations in [m]
230      * @param time times in [s]
231      * @param values data values in SI unit
232      * @throws IllegalStateException if data was added with a data stream previously
233      */
234     public synchronized void addVectorDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
235             final double[] values)
236     {
237         this.addingByQuantity = true;
238         addVectorDataSI(getDefaultDataStream(quantity), location, time, values);
239         this.addingByQuantity = false;
240     }
241 
242     /**
243      * Adds vector data.
244      * @param dataStream data stream of the data
245      * @param location locations in [m]
246      * @param time times in [s]
247      * @param values data values in SI unit
248      * @throws IllegalStateException if data was added with a quantity previously
249      */
250     public synchronized void addVectorDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
251             final double[] values)
252     {
253         checkNoQuantityData();
254         Objects.requireNonNull(dataStream, "Datastream may not be null.");
255         Objects.requireNonNull(location, "Location may not be null.");
256         Objects.requireNonNull(time, "Time may not be null.");
257         Objects.requireNonNull(values, "Values may not be null.");
258         if (location.length != time.length || time.length != values.length)
259         {
260             throw new IllegalArgumentException(String.format("Unequal lengths: location %d, time %d, data %d.", location.length,
261                     time.length, values.length));
262         }
263         for (int i = 0; i < values.length; i++)
264         {
265             if (!Double.isNaN(values[i]))
266             {
267                 getSpacioTemporalData(getSpatialData(location[i]), time[i]).put(dataStream, values[i]);
268             }
269         }
270     }
271 
272     /**
273      * Adds grid data.
274      * @param quantity quantity of the data
275      * @param location locations in [m]
276      * @param time times in [s]
277      * @param values data values in SI unit
278      * @throws IllegalStateException if data was added with a data stream previously
279      */
280     public synchronized void addGridDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
281             final double[][] values)
282     {
283         this.addingByQuantity = true;
284         addGridDataSI(getDefaultDataStream(quantity), location, time, values);
285         this.addingByQuantity = false;
286     }
287 
288     /**
289      * Adds grid data.
290      * @param dataStream data stream of the data
291      * @param location locations in [m]
292      * @param time times in [s]
293      * @param values data values in SI unit
294      * @throws IllegalStateException if data was added with a quantity previously
295      */
296     public synchronized void addGridDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
297             final double[][] values)
298     {
299         checkNoQuantityData();
300         Objects.requireNonNull(dataStream, "Datastream may not be null.");
301         Objects.requireNonNull(location, "Location may not be null.");
302         Objects.requireNonNull(time, "Time may not be null.");
303         Objects.requireNonNull(values, "Values may not be null.");
304         if (values.length != location.length)
305         {
306             throw new IllegalArgumentException(
307                     String.format("%d locations while length of data is %d", location.length, values.length));
308         }
309         for (int i = 0; i < location.length; i++)
310         {
311             if (values[i].length != time.length)
312             {
313                 throw new IllegalArgumentException(
314                         String.format("%d times while length of data is %d", time.length, values[i].length));
315             }
316             Map<Double, Map<DataStream<?>, Double>> spatialData = getSpatialData(location[i]);
317             for (int j = 0; j < time.length; j++)
318             {
319                 if (!Double.isNaN(values[i][j]))
320                 {
321                     getSpacioTemporalData(spatialData, time[j]).put(dataStream, values[i][j]);
322                 }
323             }
324         }
325     }
326 
327     /**
328      * Check that no data was added using a quantity.
329      * @throws IllegalStateException if data was added with a quantity previously
330      */
331     private void checkNoQuantityData()
332     {
333         if (!this.addingByQuantity && this.defaultDataSource != null)
334         {
335             throw new IllegalStateException(
336                     "Adding data with a data stream is not allowed after data has been added with a quantity.");
337         }
338     }
339 
340     /**
341      * Returns a default data stream and checks that no data with a data stream was added.
342      * @param quantity quantity
343      * @return default data stream
344      * @throws IllegalStateException if data was added with a data stream previously
345      */
346     private DataStream<?> getDefaultDataStream(final Quantity<?, ?> quantity)
347     {
348         Objects.requireNonNull(quantity, "Quantity may not be null.");
349         if (!this.dataSources.isEmpty())
350         {
351             throw new IllegalStateException(
352                     "Adding data with a quantity is not allowed after data has been added with a data stream.");
353         }
354         if (this.defaultDataSource == null)
355         {
356             this.defaultDataSource = new DataSource("default");
357             this.defaultDataStreams = new LinkedHashMap<>();
358         }
359         return this.defaultDataStreams.computeIfAbsent(quantity,
360                 (key) -> this.defaultDataSource.addStreamSI(quantity, 1.0, 1.0));
361     }
362 
363     /**
364      * Returns data from a specific location as a subset from all data. An empty map is returned if no such data exists.
365      * @param location location in [m]
366      * @return data from a specific location
367      */
368     private SortedMap<Double, Map<DataStream<?>, Double>> getSpatialData(final double location)
369     {
370         return this.data.computeIfAbsent(location, (key) -> new TreeMap<>());
371     }
372 
373     /**
374      * Returns data from a specific time as a subset of data from a specific location. An empty map is returned if no such data
375      * exists.
376      * @param spatialData spatially selected data
377      * @param time time in [s]
378      * @return data from a specific time, from data from a specific location
379      */
380     private Map<DataStream<?>, Double> getSpacioTemporalData(final Map<Double, Map<DataStream<?>, Double>> spatialData,
381             final double time)
382     {
383         return spatialData.computeIfAbsent(time, (key) -> new LinkedHashMap<>());
384     }
385 
386     // **********************
387     // *** KERNEL METHODS ***
388     // **********************
389 
390     /**
391      * Sets a default exponential kernel with infinite range, sigma = 300m, and tau = 30s.
392      */
393     public void setKernel()
394     {
395         setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, new ExpKernelShape(DEFAULT_SIGMA, DEFAULT_TAU));
396     }
397 
398     /**
399      * Sets an exponential kernel with infinite range.
400      * @param sigma spatial kernel size
401      * @param tau temporal kernel size
402      */
403     public void setKernelSI(final double sigma, final double tau)
404     {
405         setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
406     }
407 
408     /**
409      * Sets an exponential kernel with limited range.
410      * @param sigma spatial kernel size in [m]
411      * @param tau temporal kernel size in [s]
412      * @param xMax maximum spatial range in [m]
413      * @param tMax maximum temporal range in [s]
414      */
415     public void setKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
416     {
417         setKernelSI(xMax, tMax, new ExpKernelShape(sigma, tau));
418     }
419 
420     /**
421      * Sets a Gaussian kernel with infinite range.
422      * @param sigma spatial kernel size
423      * @param tau temporal kernel size
424      */
425     public void setGaussKernelSI(final double sigma, final double tau)
426     {
427         setGaussKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
428     }
429 
430     /**
431      * Sets a Gaussian kernel with limited range.
432      * @param sigma spatial kernel size in [m]
433      * @param tau temporal kernel size in [s]
434      * @param xMax maximum spatial range in [m]
435      * @param tMax maximum temporal range in [s]
436      */
437     public void setGaussKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
438     {
439         setKernelSI(xMax, tMax, new GaussKernelShape(sigma, tau));
440     }
441 
442     /**
443      * Sets a kernel with limited range and provided shape. The shape allows using non-exponential kernels.
444      * @param xMax maximum spatial range
445      * @param tMax maximum temporal range
446      * @param shape shape of the kernel
447      */
448     public synchronized void setKernelSI(final double xMax, final double tMax, final KernelShape shape)
449     {
450         this.kernel = new Kernel(xMax, tMax, shape);
451     }
452 
453     /**
454      * Returns the wave speed in congestion.
455      * @return wave speed in congestion
456      */
457     final double getWaveSpeedCongestion()
458     {
459         return this.cCong;
460     }
461 
462     /**
463      * Returns the wave speed in free flow.
464      * @return wave speed in free flow
465      */
466     final double getWaveSpeedFreeFlow()
467     {
468         return this.cFree;
469     }
470 
471     // **********************
472     // *** FILTER METHODS ***
473     // **********************
474 
475     /**
476      * Executes filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
477      * done. Finally, the filtering results can then be obtained from the listener.
478      * @param location location of output grid in [m]
479      * @param time time of output grid in [s]
480      * @param quantities quantities to calculate filtered data of
481      * @return listener to notify keep track of the progress
482      */
483     public EgtfParallelListener filterParallelSI(final double[] location, final double[] time,
484             final Quantity<?, ?>... quantities)
485     {
486         Objects.requireNonNull(location, "Location may not be null.");
487         Objects.requireNonNull(time, "Time may not be null.");
488         EgtfParallelListener listener = new EgtfParallelListener();
489         addListener(listener);
490         new Thread(new Runnable()
491         {
492             @Override
493             public void run()
494             {
495                 listener.setFilter(filterSI(location, time, quantities));
496                 removeListener(listener);
497             }
498         }, "Egtf calculation thread").start();
499         return listener;
500     }
501 
502     /**
503      * Executes fast filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
504      * done. Finally, the filtering results can then be obtained from the listener.
505      * @param xMin minimum location value of output grid [m]
506      * @param xStep location step of output grid [m]
507      * @param xMax maximum location value of output grid [m]
508      * @param tMin minimum time value of output grid [s]
509      * @param tStep time step of output grid [s]
510      * @param tMax maximum time value of output grid [s]
511      * @param quantities quantities to calculate filtered data of
512      * @return listener to notify keep track of the progress
513      */
514     public EgtfParallelListener filterParallelFastSI(final double xMin, final double xStep, final double xMax,
515             final double tMin, final double tStep, final double tMax, final Quantity<?, ?>... quantities)
516     {
517         EgtfParallelListener listener = new EgtfParallelListener();
518         addListener(listener);
519         new Thread(new Runnable()
520         {
521             @Override
522             public void run()
523             {
524                 listener.setFilter(filterFastSI(xMin, xStep, xMax, tMin, tStep, tMax, quantities));
525                 removeListener(listener);
526             }
527         }, "Egtf calculation thread").start();
528         return listener;
529     }
530 
531     /**
532      * Returns filtered data. This is the standard EGTF implementation.
533      * @param location location of output grid in [m]
534      * @param time time of output grid in [s]
535      * @param quantities quantities to calculate filtered data of
536      * @return filtered data, {@code null} when interrupted
537      */
538     @SuppressWarnings("methodlength")
539     public Filter filterSI(final double[] location, final double[] time, final Quantity<?, ?>... quantities)
540     {
541         Objects.requireNonNull(location, "Location may not be null.");
542         Objects.requireNonNull(time, "Time may not be null.");
543 
544         // initialize data
545         Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
546         for (Quantity<?, ?> quantity : quantities)
547         {
548             map.put(quantity, new double[location.length][time.length]);
549         }
550 
551         // loop grid locations
552         for (int i = 0; i < location.length; i++)
553         {
554             double xGrid = location[i];
555 
556             // filter applicable data for location
557             Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
558                     this.data.subMap(this.kernel.fromLocation(xGrid), true, this.kernel.toLocation(xGrid), true);
559 
560             // loop grid times
561             for (int j = 0; j < time.length; j++)
562             {
563                 double tGrid = time[j];
564 
565                 // notify
566                 if (notifyListeners((i + (double) j / time.length) / location.length))
567                 {
568                     return null;
569                 }
570 
571                 // initialize data per stream
572                 // quantity z assuming congestion and free flow
573                 Map<DataStream<?>, DualWeightedMean> zCongFree = new LinkedHashMap<>();
574 
575                 // filter and loop applicable data for time
576                 for (Map.Entry<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> xEntry : spatialData.entrySet())
577                 {
578                     double dx = xEntry.getKey() - xGrid;
579                     Map<Double, Map<DataStream<?>, Double>> temporalData =
580                             xEntry.getValue().subMap(this.kernel.fromTime(tGrid), true, this.kernel.toTime(tGrid), true);
581 
582                     for (Map.Entry<Double, Map<DataStream<?>, Double>> tEntry : temporalData.entrySet())
583                     {
584                         double dt = tEntry.getKey() - tGrid;
585                         Map<DataStream<?>, Double> pData = tEntry.getValue();
586 
587                         double phiCong = this.kernel.weight(this.cCong, dx, dt);
588                         double phiFree = this.kernel.weight(this.cFree, dx, dt);
589 
590                         // loop streams data at point
591                         for (Map.Entry<DataStream<?>, Double> vEntry : pData.entrySet())
592                         {
593                             DataStream<?> stream = vEntry.getKey();
594                             if (map.containsKey(stream.getQuantity()) || stream.getQuantity().isSpeed())
595                             {
596                                 double v = vEntry.getValue();
597                                 DualWeightedMean zCongFreeOfStream =
598                                         zCongFree.computeIfAbsent(stream, (key) -> new DualWeightedMean());
599                                 zCongFreeOfStream.addCong(v, phiCong);
600                                 zCongFreeOfStream.addFree(v, phiFree);
601                             }
602                         }
603                     }
604                 }
605 
606                 // figure out the congestion level estimated for each data source
607                 Map<DataSource, Double> w = new LinkedHashMap<>();
608                 for (Map.Entry<DataStream<?>, DualWeightedMean> streamEntry : zCongFree.entrySet())
609                 {
610                     DataStream<?> dataStream = streamEntry.getKey();
611                     if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
612                     {
613                         DualWeightedMean zCongFreeOfStream = streamEntry.getValue();
614                         double u = Math.min(zCongFreeOfStream.getCong(), zCongFreeOfStream.getFree());
615                         w.put(dataStream.getDataSource(), // 1 speed quantity per source allowed
616                                 .5 * (1.0 + Math.tanh((Egtf.this.vc - u) / Egtf.this.deltaV)));
617                         continue;
618                     }
619                 }
620 
621                 // sum available data sources per quantity
622                 Double wMean = null;
623                 for (Map.Entry<Quantity<?, ?>, double[][]> qEntry : map.entrySet())
624                 {
625                     Quantity<?, ?> quantity = qEntry.getKey();
626                     WeightedMean z = new WeightedMean();
627                     for (Map.Entry<DataStream<?>, DualWeightedMean> zEntry : zCongFree.entrySet())
628                     {
629                         DataStream<?> dataStream = zEntry.getKey();
630                         if (dataStream.getQuantity().equals(quantity))
631                         {
632                             // obtain congestion level
633                             double wCong;
634                             if (!w.containsKey(dataStream.getDataSource()))
635                             {
636                                 // this data source has no speed data, but congestion level can be estimated from other sources
637                                 if (wMean == null)
638                                 {
639                                     // let's see if speed was estimated already
640                                     for (Quantity<?, ?> prevQuant : quantities)
641                                     {
642                                         if (prevQuant.equals(quantity))
643                                         {
644                                             // it was not, get mean of other data source
645                                             wMean = 0.0;
646                                             for (double ww : w.values())
647                                             {
648                                                 wMean += ww / w.size();
649                                             }
650                                             break;
651                                         }
652                                         else if (prevQuant.isSpeed())
653                                         {
654                                             wMean = .5 * (1.0
655                                                     + Math.tanh((Egtf.this.vc - map.get(prevQuant)[i][j]) / Egtf.this.deltaV));
656                                             break;
657                                         }
658                                     }
659                                 }
660                                 wCong = wMean;
661                             }
662                             else
663                             {
664                                 wCong = w.get(dataStream.getDataSource());
665                             }
666                             // calculate estimated value z of this data source (no duplicate quantities per source allowed)
667                             double wfree = 1.0 - wCong;
668                             DualWeightedMean zCongFreej = zEntry.getValue();
669                             double zStream = wCong * zCongFreej.getCong() + wfree * zCongFreej.getFree();
670                             double weight;
671                             if (w.size() > 1)
672                             {
673                                 // data source more important if more and nearer measurements
674                                 double beta = wCong * zCongFreej.getDenominatorCong() + wfree * zCongFreej.getDenominatorFree();
675                                 // more important if more reliable (smaller standard deviation) at congestion level
676                                 double alpha = wCong / dataStream.getThetaCong() + wfree / dataStream.getThetaFree();
677                                 weight = alpha * beta;
678                             }
679                             else
680                             {
681                                 weight = 1.0;
682                             }
683                             z.add(zStream, weight);
684                         }
685                     }
686                     qEntry.getValue()[i][j] = z.get();
687                 }
688             }
689         }
690         notifyListeners(1.0);
691 
692         return new FilterDouble(location, time, map);
693     }
694 
695     /**
696      * Returns filtered data that is processed using fast fourier transformation. This is much faster than the standard filter,
697      * at the cost that all input data is discretized to the output grid. The gain in computation speed is however such that
698      * finer output grids can be used to alleviate this. For discretization the output grid needs to be equidistant. It is
699      * recommended to set a Kernel with maximum bounds before using this method.
700      * <p>
701      * More than being a fast implementation of the Adaptive Smoothing Method, this implementation includes all data source like
702      * the Extended Generalized Treiber-Helbing Filter.
703      * @param xMin minimum location value of output grid [m]
704      * @param xStep location step of output grid [m]
705      * @param xMax maximum location value of output grid [m]
706      * @param tMin minimum time value of output grid [s]
707      * @param tStep time step of output grid [s]
708      * @param tMax maximum time value of output grid [s]
709      * @param quantities quantities to calculate filtered data of
710      * @return filtered data, {@code null} when interrupted
711      */
712     @SuppressWarnings("methodlength")
713     public Filter filterFastSI(final double xMin, final double xStep, final double xMax, final double tMin, final double tStep,
714             final double tMax, final Quantity<?, ?>... quantities)
715     {
716         if (xMin > xMax || xStep <= 0.0 || tMin > tMax || tStep <= 0.0)
717         {
718             throw new IllegalArgumentException(
719                     "Ill-defined grid. Make sure that xMax >= xMin, dx > 0, tMax >= tMin and dt > 0");
720         }
721         if (notifyListeners(0.0))
722         {
723             return null;
724         }
725 
726         // initialize data
727         int n = 1 + (int) ((xMax - xMin) / xStep);
728         double[] location = new double[n];
729         IntStream.range(0, n).forEach(i -> location[i] = xMin + i * xStep);
730         n = 1 + (int) ((tMax - tMin) / tStep);
731         double[] time = new double[n];
732         IntStream.range(0, n).forEach(j -> time[j] = tMin + j * tStep);
733         Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
734         Map<Quantity<?, ?>, double[][]> weights = new LinkedHashMap<>();
735         for (Quantity<?, ?> quantity : quantities)
736         {
737             map.put(quantity, new double[location.length][time.length]);
738             weights.put(quantity, new double[location.length][time.length]);
739         }
740 
741         // discretize Kernel
742         double xFrom = this.kernel.fromLocation(0.0);
743         xFrom = Double.isInfinite(xFrom) ? 2.0 * (xMin - xMax) : xFrom;
744         double xTo = this.kernel.toLocation(0.0);
745         xTo = Double.isInfinite(xTo) ? 2.0 * (xMax - xMin) : xTo;
746         double[] dx = equidistant(xFrom, xStep, xTo);
747         double tFrom = this.kernel.fromTime(0.0);
748         tFrom = Double.isInfinite(tFrom) ? 2.0 * (tMin - tMax) : tFrom;
749         double tTo = this.kernel.toTime(0.0);
750         tTo = Double.isInfinite(tTo) ? 2.0 * (tMax - tMin) : tTo;
751         double[] dt = equidistant(tFrom, tStep, tTo);
752         double[][] phiCong = new double[dx.length][dt.length];
753         double[][] phiFree = new double[dx.length][dt.length];
754         for (int i = 0; i < dx.length; i++)
755         {
756             for (int j = 0; j < dt.length; j++)
757             {
758                 phiCong[i][j] = this.kernel.weight(this.cCong, dx[i], dt[j]);
759                 phiFree[i][j] = this.kernel.weight(this.cFree, dx[i], dt[j]);
760             }
761         }
762 
763         // discretize data
764         Map<DataStream<?>, double[][]> dataSum = new LinkedHashMap<>();
765         Map<DataStream<?>, double[][]> dataCount = new LinkedHashMap<>(); // integer counts, must be double[][] for convolution
766         // loop grid locations
767         for (int i = 0; i < location.length; i++)
768         {
769             // filter applicable data for location
770             Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
771                     this.data.subMap(location[i] - 0.5 * xStep, true, location[i] + 0.5 * xStep, true);
772             // loop grid times
773             for (int j = 0; j < time.length; j++)
774             {
775                 // filter and loop applicable data for time
776                 for (NavigableMap<Double, Map<DataStream<?>, Double>> locationData : spatialData.values())
777                 {
778                     NavigableMap<Double, Map<DataStream<?>, Double>> temporalData =
779                             locationData.subMap(time[j] - 0.5 * tStep, true, time[j] + 0.5 * tStep, true);
780                     for (Map<DataStream<?>, Double> timeData : temporalData.values())
781                     {
782                         for (Map.Entry<DataStream<?>, Double> timeEntry : timeData.entrySet())
783                         {
784                             if (map.containsKey(timeEntry.getKey().getQuantity()) || timeEntry.getKey().getQuantity().isSpeed())
785                             {
786                                 dataSum.computeIfAbsent(timeEntry.getKey(),
787                                         (key) -> new double[location.length][time.length])[i][j] += timeEntry.getValue();
788                                 dataCount.computeIfAbsent(timeEntry.getKey(),
789                                         (key) -> new double[location.length][time.length])[i][j]++;
790                             }
791                         }
792                     }
793                 }
794             }
795         }
796 
797         // figure out the congestion level estimated for each data source
798         double steps = quantities.length + 1; // speed (for congestion level) and then all in quantities
799         double step = 0;
800         // store maps to prevent us from calculating the convolution for speed again later
801         Map<DataSource, double[][]> w = new LinkedHashMap<>();
802         Map<DataSource, double[][]> zCongSpeed = new LinkedHashMap<>();
803         Map<DataSource, double[][]> zFreeSpeed = new LinkedHashMap<>();
804         Map<DataSource, double[][]> nCongSpeed = new LinkedHashMap<>();
805         Map<DataSource, double[][]> nFreeSpeed = new LinkedHashMap<>();
806         for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
807         {
808             DataStream<?> dataStream = zEntry.getKey();
809             if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
810             {
811                 // notify
812                 double[][] vCong = Convolution.convolution(phiCong, zEntry.getValue());
813                 if (notifyListeners((step + 0.25) / steps))
814                 {
815                     return null;
816                 }
817                 double[][] vFree = Convolution.convolution(phiFree, zEntry.getValue());
818                 if (notifyListeners((step + 0.5) / steps))
819                 {
820                     return null;
821                 }
822                 double[][] count = dataCount.get(dataStream);
823                 double[][] nCong = Convolution.convolution(phiCong, count);
824                 if (notifyListeners((step + 0.75) / steps))
825                 {
826                     return null;
827                 }
828                 double[][] nFree = Convolution.convolution(phiFree, count);
829                 double[][] wSource = new double[vCong.length][vCong[0].length];
830                 for (int i = 0; i < vCong.length; i++)
831                 {
832                     for (int j = 0; j < vCong[0].length; j++)
833                     {
834                         double u = Math.min(vCong[i][j] / nCong[i][j], vFree[i][j] / nFree[i][j]);
835                         wSource[i][j] = .5 * (1.0 + Math.tanh((Egtf.this.vc - u) / Egtf.this.deltaV));
836                     }
837                 }
838                 w.put(dataStream.getDataSource(), wSource);
839                 zCongSpeed.put(dataStream.getDataSource(), vCong);
840                 zFreeSpeed.put(dataStream.getDataSource(), vFree);
841                 nCongSpeed.put(dataStream.getDataSource(), nCong);
842                 nFreeSpeed.put(dataStream.getDataSource(), nFree);
843             }
844         }
845         step++;
846         if (notifyListeners(step / steps))
847         {
848             return null;
849         }
850 
851         // sum available data sources per quantity
852         double[][] wMean = null;
853         for (Quantity<?, ?> quantity : quantities)
854         {
855             // gather place for this quantity
856             double[][] qData = map.get(quantity);
857             double[][] qWeights = weights.get(quantity);
858             // loop streams that provide this quantity
859             Set<Map.Entry<DataStream<?>, double[][]>> zEntries = new LinkedHashSet<>();
860             for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
861             {
862                 if (zEntry.getKey().getQuantity().equals(quantity))
863                 {
864                     zEntries.add(zEntry);
865                 }
866             }
867             double streamCounter = 0;
868             for (Map.Entry<DataStream<?>, double[][]> zEntry : zEntries)
869             {
870                 DataStream<?> dataStream = zEntry.getKey();
871 
872                 // obtain congestion level
873                 double[][] wj;
874                 if (!w.containsKey(dataStream.getDataSource()))
875                 {
876                     // this data source has no speed data, but congestion level can be estimated from other sources
877                     if (wMean == null)
878                     {
879                         // let's see if speed was estimated already
880                         for (Quantity<?, ?> prevQuant : quantities)
881                         {
882                             if (prevQuant.equals(quantity))
883                             {
884                                 // it was not, get mean of other data source
885                                 wMean = new double[location.length][time.length];
886                                 for (double[][] ww : w.values())
887                                 {
888                                     for (int i = 0; i < location.length; i++)
889                                     {
890                                         for (int j = 0; j < time.length; j++)
891                                         {
892                                             wMean[i][j] += ww[i][j] / w.size();
893                                         }
894                                     }
895                                 }
896                                 break;
897                             }
898                             else if (prevQuant.isSpeed())
899                             {
900                                 wMean = new double[location.length][time.length];
901                                 double[][] v = map.get(prevQuant);
902                                 for (int i = 0; i < location.length; i++)
903                                 {
904                                     for (int j = 0; j < time.length; j++)
905                                     {
906                                         wMean[i][j] = .5 * (1.0 + Math.tanh((Egtf.this.vc - v[i][j]) / Egtf.this.deltaV));
907                                     }
908                                 }
909                                 break;
910                             }
911                         }
912                     }
913                     wj = wMean;
914                 }
915                 else
916                 {
917                     wj = w.get(dataStream.getDataSource());
918                 }
919 
920                 // convolutions of filters with discretized data and data counts
921                 double[][] zCong;
922                 double[][] zFree;
923                 double[][] nCong;
924                 double[][] nFree;
925                 if (dataStream.getQuantity().isSpeed())
926                 {
927                     zCong = zCongSpeed.get(dataStream.getDataSource());
928                     zFree = zFreeSpeed.get(dataStream.getDataSource());
929                     nCong = nCongSpeed.get(dataStream.getDataSource());
930                     nFree = nFreeSpeed.get(dataStream.getDataSource());
931                 }
932                 else
933                 {
934                     zCong = Convolution.convolution(phiCong, zEntry.getValue());
935                     if (notifyListeners((step + (streamCounter + 0.25) / zEntries.size()) / steps))
936                     {
937                         return null;
938                     }
939                     zFree = Convolution.convolution(phiFree, zEntry.getValue());
940                     if (notifyListeners((step + (streamCounter + 0.5) / zEntries.size()) / steps))
941                     {
942                         return null;
943                     }
944                     double[][] count = dataCount.get(dataStream);
945                     nCong = Convolution.convolution(phiCong, count);
946                     if (notifyListeners((step + (streamCounter + 0.75) / zEntries.size()) / steps))
947                     {
948                         return null;
949                     }
950                     nFree = Convolution.convolution(phiFree, count);
951                 }
952 
953                 // loop grid to add to each weighted sum (weighted per data source)
954                 for (int i = 0; i < location.length; i++)
955                 {
956                     for (int j = 0; j < time.length; j++)
957                     {
958                         double wCong = wj[i][j];
959                         double wFree = 1.0 - wCong;
960                         double value = wCong * zCong[i][j] / nCong[i][j] + wFree * zFree[i][j] / nFree[i][j];
961                         // the fast filter supplies convoluted data counts, i.e. amount of data and filter proximity; this
962                         // is exactly what the EGTF method needs to weigh data sources
963                         double beta = wCong * nCong[i][j] + wFree * nFree[i][j];
964                         double alpha = wCong / dataStream.getThetaCong() + wFree / dataStream.getThetaFree();
965                         double weight = beta * alpha;
966                         qData[i][j] += (value * weight);
967                         qWeights[i][j] += weight;
968                     }
969                 }
970                 streamCounter++;
971                 if (notifyListeners((step + streamCounter / zEntries.size()) / steps))
972                 {
973                     return null;
974                 }
975             }
976             for (int i = 0; i < location.length; i++)
977             {
978                 for (int j = 0; j < time.length; j++)
979                 {
980                     qData[i][j] /= qWeights[i][j];
981                 }
982             }
983             step++;
984         }
985 
986         return new FilterDouble(location, time, map);
987     }
988 
989     /**
990      * Returns an equidistant vector that includes 0.
991      * @param from lowest value to include
992      * @param step step
993      * @param to highest value to include
994      * @return equidistant vector that includes 0
995      */
996     private double[] equidistant(final double from, final double step, final double to)
997     {
998         int n1 = (int) (-from / step);
999         int n2 = (int) (to / step);
1000         int n = n1 + n2 + 1;
1001         double[] array = new double[n];
1002         for (int i = 0; i < n; i++)
1003         {
1004             array[i] = i < n1 ? step * (-n1 + i) : step * (i - n1);
1005         }
1006         return array;
1007     }
1008 
1009     // *********************
1010     // *** EVENT METHODS ***
1011     // *********************
1012 
1013     /**
1014      * Interrupt the calculation.
1015      */
1016     public final void interrupt()
1017     {
1018         this.interrupted = true;
1019     }
1020 
1021     /**
1022      * Add listener.
1023      * @param listener listener
1024      */
1025     public final void addListener(final EgtfListener listener)
1026     {
1027         this.listeners.add(listener);
1028     }
1029 
1030     /**
1031      * Remove listener.
1032      * @param listener listener
1033      */
1034     public final void removeListener(final EgtfListener listener)
1035     {
1036         this.listeners.remove(listener);
1037     }
1038 
1039     /**
1040      * Notify all listeners.
1041      * @param progress progress, a value in the range [0 ... 1]
1042      * @return whether the filter is interrupted
1043      */
1044     private boolean notifyListeners(final double progress)
1045     {
1046         if (!this.listeners.isEmpty())
1047         {
1048             EgtfEvent event = new EgtfEvent(this, progress);
1049             for (EgtfListener listener : this.listeners)
1050             {
1051                 listener.notifyProgress(event);
1052             }
1053         }
1054         return this.interrupted;
1055     }
1056 
1057     // **********************
1058     // *** HELPER CLASSES ***
1059     // **********************
1060 
1061     /**
1062      * Small class to build up a weighted mean under the congestion and free flow assumption.
1063      */
1064     private final class DualWeightedMean
1065     {
1066         /** Cumulative congestion numerator of weighted mean fraction, i.e. weighted sum. */
1067         private double numeratorCong;
1068 
1069         /** Cumulative free flow numerator of weighted mean fraction, i.e. weighted sum. */
1070         private double numeratorFree;
1071 
1072         /** Cumulative congestion denominator of weighted mean fraction, i.e. sum of weights. */
1073         private double denominatorCong;
1074 
1075         /** Cumulative free flow denominator of weighted mean fraction, i.e. sum of weights. */
1076         private double denominatorFree;
1077 
1078         /**
1079          * Adds a congestion value with weight.
1080          * @param value value
1081          * @param weight weight
1082          */
1083         public void addCong(final double value, final double weight)
1084         {
1085             this.numeratorCong += value * weight;
1086             this.denominatorCong += weight;
1087         }
1088 
1089         /**
1090          * Adds a free flow value with weight.
1091          * @param value value
1092          * @param weight weight
1093          */
1094         public void addFree(final double value, final double weight)
1095         {
1096             this.numeratorFree += value * weight;
1097             this.denominatorFree += weight;
1098         }
1099 
1100         /**
1101          * Returns the weighted congestion mean of available data.
1102          * @return weighted mean of available data
1103          */
1104         public double getCong()
1105         {
1106             return this.numeratorCong / this.denominatorCong;
1107         }
1108 
1109         /**
1110          * Returns the weighted free flow mean of available data.
1111          * @return weighted free flow mean of available data
1112          */
1113         public double getFree()
1114         {
1115             return this.numeratorFree / this.denominatorFree;
1116         }
1117 
1118         /**
1119          * Returns the sum of congestion weights.
1120          * @return the sum of congestion weights
1121          */
1122         public double getDenominatorCong()
1123         {
1124             return this.denominatorCong;
1125         }
1126 
1127         /**
1128          * Returns the sum of free flow weights.
1129          * @return the sum of free flow weights
1130          */
1131         public double getDenominatorFree()
1132         {
1133             return this.denominatorFree;
1134         }
1135 
1136         @Override
1137         public String toString()
1138         {
1139             return "DualWeightedMean [numeratorCong=" + this.numeratorCong + ", numeratorFree=" + this.numeratorFree
1140                     + ", denominatorCong=" + this.denominatorCong + ", denominatorFree=" + this.denominatorFree + "]";
1141         }
1142 
1143     }
1144 
1145     /**
1146      * Small class to build up a weighted mean.
1147      */
1148     private final class WeightedMean
1149     {
1150         /** Cumulative numerator of weighted mean fraction, i.e. weighted sum. */
1151         private double numerator;
1152 
1153         /** Cumulative denominator of weighted mean fraction, i.e. sum of weights. */
1154         private double denominator;
1155 
1156         /**
1157          * Adds a value with weight.
1158          * @param value value
1159          * @param weight weight
1160          */
1161         public void add(final double value, final double weight)
1162         {
1163             this.numerator += value * weight;
1164             this.denominator += weight;
1165         }
1166 
1167         /**
1168          * Returns the weighted mean of available data.
1169          * @return weighted mean of available data
1170          */
1171         public double get()
1172         {
1173             return this.numerator / this.denominator;
1174         }
1175 
1176         @Override
1177         public String toString()
1178         {
1179             return "WeightedMean [numerator=" + this.numerator + ", denominator=" + this.denominator + "]";
1180         }
1181 
1182     }
1183 
1184     @Override
1185     public String toString()
1186     {
1187         return "EGTF [kernel=" + this.kernel + ", cCong=" + this.cCong + ", cFree=" + this.cFree + ", deltaV=" + this.deltaV
1188                 + ", vc=" + this.vc + ", dataSources=" + this.dataSources + ", data=" + this.data + ", interrupted="
1189                 + this.interrupted + ", listeners=" + this.listeners + "]";
1190     }
1191 
1192 }