View Javadoc
1   package org.opentrafficsim.draw.egtf;
2   
3   import java.util.LinkedHashMap;
4   import java.util.LinkedHashSet;
5   import java.util.Map;
6   import java.util.NavigableMap;
7   import java.util.Objects;
8   import java.util.Set;
9   import java.util.SortedMap;
10  import java.util.TreeMap;
11  import java.util.stream.IntStream;
12  
13  /**
14   * Extended Generalized Treiber-Helbing Filter (van Lint and Hoogendoorn, 2009). This is an extension of the Adaptive Smoothing
15   * Method (Treiber and Helbing, 2002). A fast filter for equidistant grids (Schreiter et al., 2010) is available. This fast
16   * implementation also supports multiple data sources.
17   * <p>
18   * To allow flexible usage the EGTF works with {@code DataSource}, {@code DataStream} and {@code Quantity}.
19   * <p>
20   * A {@code DataSource}, such as "loop detectors", "floating-car data" or "camera" is mostly an identifier, but can be requested
21   * to provide several data streams.
22   * <p>
23   * A {@code DataStream} is one {@code DataSource} supplying one {@code Quantity}. For instance "loop detectors" supplying
24   * "flow". In a {@code DataStream}, supplied by the {@code DataSource}, standard deviation of measurements in congestion and
25   * free flow are defined. These determine the reliability of the {@code Quantity} data from the given {@code DataSource}, and
26   * thus ultimately the weight of the data in the estimation of the quantity.
27   * <p>
28   * A {@code Quantity}, such as "flow" or "density" defines what is measured and what is requested as output. The output can be
29   * in typed format using a {@code Converter}. Default quantities are available under {@code SPEED_SI}, {@code FLOW_SI} and
30   * {@code DENSITY_SI}, all under {@code Quantity}.
31   * <p>
32   * Data can be added using several methods for point data, vector data (multiple independent location-time combinations) and
33   * grid data (data in a grid pattern). Data is added for a particular {@code DataStream}.
34   * <p>
35   * For simple use-cases where a single data source is used, data can be added directly with a {@code Quantity}, in which case a
36   * default {@code DataSource}, and default {@code DataStream} for each {@code Quantity} is internally used. All data should
37   * either be added using {@code Quantity}'s, or it should all be added using {@code DataSource}'s. Otherwise relative data
38   * reliability is undefined.
39   * <p>
40   * Output can be requested from the EGTF using a {@code Kernel}, a spatiotemporal pattern determining measurement weights. The
41   * {@code Kernel} defines an optional maximum spatial and temporal range for measurements to consider, and uses a {@code Shape}
42   * to determine the weight for a given distance and time from the estimated point. By default this is an exponential function. A
43   * Gaussian kernel is also available, while any other shape could be also be implemented.
44   * <p>
45   * Parameters from the EGTF are found in the following places:
46   * <ul>
47   * <li>{@code EGTF}: <i>cCong</i>, <i>cFree</i>, <i>deltaV</i> and <i>vc</i>, defining the overall traffic flow properties.</li>
48   * <li>{@code Kernel}: <i>tMax</i> and <i>xMax</i>, defining the maximum range to consider.</li>
49   * <li>{@code KernelShape}: <i>sigma</i> and <i>tau</i>, determining the decay of weights for further measurements in space and
50   * time. (Specifically {@code GaussKernelShape})</li>
51   * <li>{@code DataStream}: <i>thetaCong</i> and <i>thetaFree</i>, defining the reliability by the standard deviation of measured
52   * data in free flow and congestion from a particular data stream.</li>
53   * </ul>
54   * References:
55   * <ul>
56   * <li>van Lint, J. W. C. and Hoogendoorn, S. P. (2009). A robust and efficient method for fusing heterogeneous data from
57   * traffic sensors on freeways. Computer Aided Civil and Infrastructure Engineering, accepted for publication.</li>
58   * <li>Schreiter, T., van Lint, J. W. C., Treiber, M. and Hoogendoorn, S. P. (2010). Two fast implementations of the Adaptive
59   * Smoothing Method used in highway traffic state estimation. 13th International IEEE Conference on Intelligent Transportation
60   * Systems, 19-22 Sept. 2010, Funchal, Portugal.</li>
61   * <li>Treiber, M. and Helbing, D. (2002). Reconstructing the spatio-temporal traffic dynamics from stationary detector data.
62   * Cooper@tive Tr@nsport@tion Dyn@mics, 1:3.1–3.24.</li>
63   * </ul>
64   * <p>
65   * Copyright (c) 2013-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
66   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
67   * </p>
68   * @author <a href="https://github.com/wjschakel">Wouter Schakel</a>
69   */
70  public class Egtf
71  {
72  
73      /** Default sigma value. */
74      private static final double DEFAULT_SIGMA = 300.0;
75  
76      /** Default tau value. */
77      private static final double DEFAULT_TAU = 30.0;
78  
79      /** Filter kernel. */
80      private Kernel kernel;
81  
82      /** Shock wave speed in congestion. */
83      private final double cCong;
84  
85      /** Shock wave speed in free flow. */
86      private final double cFree;
87  
88      /** Speed range between congestion and free flow. */
89      private final double deltaV;
90  
91      /** Flip-over speed below which we have congestion. */
92      private final double vc;
93  
94      /** Data sources by label so we can return the same instances upon repeated request. */
95      private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
96  
97      /** Default data source for cases where a single data source is used. */
98      private DataSource defaultDataSource = null;
99  
100     /** Default data streams for cases where a single data source is used. */
101     private Map<Quantity<?, ?>, DataStream<?>> defaultDataStreams = null;
102 
103     /** True if data is currently being added using a quantity, in which case a check should not occur. */
104     private boolean addingByQuantity;
105 
106     /** All point data sorted by space and time, and per data stream. */
107     private NavigableMap<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> data = new TreeMap<>();
108 
109     /** Whether the calculation was interrupted. */
110     private boolean interrupted = false;
111 
112     /** Listeners. */
113     private Set<EgtfListener> listeners = new LinkedHashSet<>();
114 
115     /**
116      * Constructor using cCong = -18km/h, cFree = 80km/h, deltaV = 10km/h and vc = 80km/h. A default kernel is set.
117      */
118     public Egtf()
119     {
120         this(-18.0, 80.0, 10.0, 80.0);
121     }
122 
123     /**
124      * Constructor defining global settings. A default kernel is set.
125      * @param cCong double; shock wave speed in congestion [km/h]
126      * @param cFree double; shock wave speed in free flow [km/h]
127      * @param deltaV double; speed range between congestion and free flow [km/h]
128      * @param vc double; flip-over speed below which we have congestion [km/h]
129      */
130     public Egtf(final double cCong, final double cFree, final double deltaV, final double vc)
131     {
132         this.cCong = cCong / 3.6;
133         this.cFree = cFree / 3.6;
134         this.deltaV = deltaV / 3.6;
135         this.vc = vc / 3.6;
136         setKernel();
137     }
138 
139     /**
140      * Convenience constructor that also sets a specified kernel.
141      * @param cCong double; shock wave speed in congestion [km/h]
142      * @param cFree double; shock wave speed in free flow [km/h]
143      * @param deltaV double; speed range between congestion and free flow [km/h]
144      * @param vc double; flip-over speed below which we have congestion [km/h]
145      * @param sigma double; spatial kernel size in [m]
146      * @param tau double; temporal kernel size in [s]
147      * @param xMax double; maximum spatial range in [m]
148      * @param tMax double; maximum temporal range in [s]
149      */
150     @SuppressWarnings("parameternumber")
151     public Egtf(final double cCong, final double cFree, final double deltaV, final double vc, final double sigma,
152             final double tau, final double xMax, final double tMax)
153     {
154         this(cCong, cFree, deltaV, vc);
155         setKernelSI(sigma, tau, xMax, tMax);
156     }
157 
158     // ********************
159     // *** DATA METHODS ***
160     // ********************
161 
162     /**
163      * Return a data source, which is created if necessary.
164      * @param name String; unique name for the data source
165      * @return DataSource; data source
166      * @throws IllegalStateException when data has been added without a data source
167      */
168     public DataSource getDataSource(final String name)
169     {
170         if (this.defaultDataSource != null)
171         {
172             throw new IllegalStateException(
173                     "Obtaining a (new) data source after data has been added without a data source is not allowed.");
174         }
175         return this.dataSources.computeIfAbsent(name, (
176                 key
177         ) -> new DataSource(key));
178     }
179 
180     /**
181      * Removes all data from before the given time. This is useful in live usages of this class, where older data is no longer
182      * required.
183      * @param time double; time before which all data can be removed
184      */
185     public synchronized void clearDataBefore(final double time)
186     {
187         for (SortedMap<Double, Map<DataStream<?>, Double>> map : this.data.values())
188         {
189             map.subMap(Double.NEGATIVE_INFINITY, time).clear();
190         }
191     }
192 
193     /**
194      * Adds point data.
195      * @param quantity Quantity&lt;?, ?&gt;; quantity of the data
196      * @param location double; location in [m]
197      * @param time double; time in [s]
198      * @param value double; data value
199      * @throws IllegalStateException if data was added with a data stream previously
200      */
201     public synchronized void addPointDataSI(final Quantity<?, ?> quantity, final double location, final double time,
202             final double value)
203     {
204         this.addingByQuantity = true;
205         addPointDataSI(getDefaultDataStream(quantity), location, time, value);
206         this.addingByQuantity = false;
207     }
208 
209     /**
210      * Adds point data.
211      * @param dataStream DataStream&lt;?&gt;; data stream of the data
212      * @param location double; location in [m]
213      * @param time double; time in [s]
214      * @param value double; data value
215      * @throws IllegalStateException if data was added with a quantity previously
216      */
217     public synchronized void addPointDataSI(final DataStream<?> dataStream, final double location, final double time,
218             final double value)
219     {
220         checkNoQuantityData();
221         Objects.requireNonNull(dataStream, "Datastream may not be null.");
222         if (!Double.isNaN(value))
223         {
224             getSpacioTemporalData(getSpatialData(location), time).put(dataStream, value);
225         }
226     }
227 
228     /**
229      * Adds vector data.
230      * @param quantity Quantity&lt;?, ?&gt;; quantity of the data
231      * @param location double[]; locations in [m]
232      * @param time double[]; times in [s]
233      * @param values double[]; data values in SI unit
234      * @throws IllegalStateException if data was added with a data stream previously
235      */
236     public synchronized void addVectorDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
237             final double[] values)
238     {
239         this.addingByQuantity = true;
240         addVectorDataSI(getDefaultDataStream(quantity), location, time, values);
241         this.addingByQuantity = false;
242     }
243 
244     /**
245      * Adds vector data.
246      * @param dataStream DataStream&lt;?&gt;; data stream of the data
247      * @param location double[]; locations in [m]
248      * @param time double[]; times in [s]
249      * @param values double[]; data values in SI unit
250      * @throws IllegalStateException if data was added with a quantity previously
251      */
252     public synchronized void addVectorDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
253             final double[] values)
254     {
255         checkNoQuantityData();
256         Objects.requireNonNull(dataStream, "Datastream may not be null.");
257         Objects.requireNonNull(location, "Location may not be null.");
258         Objects.requireNonNull(time, "Time may not be null.");
259         Objects.requireNonNull(values, "Values may not be null.");
260         if (location.length != time.length || time.length != values.length)
261         {
262             throw new IllegalArgumentException(String.format("Unequal lengths: location %d, time %d, data %d.", location.length,
263                     time.length, values.length));
264         }
265         for (int i = 0; i < values.length; i++)
266         {
267             if (!Double.isNaN(values[i]))
268             {
269                 getSpacioTemporalData(getSpatialData(location[i]), time[i]).put(dataStream, values[i]);
270             }
271         }
272     }
273 
274     /**
275      * Adds grid data.
276      * @param quantity Quantity&lt;?, ?&gt;; quantity of the data
277      * @param location double[]; locations in [m]
278      * @param time double[]; times in [s]
279      * @param values double[][]; data values in SI unit
280      * @throws IllegalStateException if data was added with a data stream previously
281      */
282     public synchronized void addGridDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
283             final double[][] values)
284     {
285         this.addingByQuantity = true;
286         addGridDataSI(getDefaultDataStream(quantity), location, time, values);
287         this.addingByQuantity = false;
288     }
289 
290     /**
291      * Adds grid data.
292      * @param dataStream DataStream&lt;?&gt;; data stream of the data
293      * @param location double[]; locations in [m]
294      * @param time double[]; times in [s]
295      * @param values double[][]; data values in SI unit
296      * @throws IllegalStateException if data was added with a quantity previously
297      */
298     public synchronized void addGridDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
299             final double[][] values)
300     {
301         checkNoQuantityData();
302         Objects.requireNonNull(dataStream, "Datastream may not be null.");
303         Objects.requireNonNull(location, "Location may not be null.");
304         Objects.requireNonNull(time, "Time may not be null.");
305         Objects.requireNonNull(values, "Values may not be null.");
306         if (values.length != location.length)
307         {
308             throw new IllegalArgumentException(
309                     String.format("%d locations while length of data is %d", location.length, values.length));
310         }
311         for (int i = 0; i < location.length; i++)
312         {
313             if (values[i].length != time.length)
314             {
315                 throw new IllegalArgumentException(
316                         String.format("%d times while length of data is %d", time.length, values[i].length));
317             }
318             Map<Double, Map<DataStream<?>, Double>> spatialData = getSpatialData(location[i]);
319             for (int j = 0; j < time.length; j++)
320             {
321                 if (!Double.isNaN(values[i][j]))
322                 {
323                     getSpacioTemporalData(spatialData, time[j]).put(dataStream, values[i][j]);
324                 }
325             }
326         }
327     }
328 
329     /**
330      * Check that no data was added using a quantity.
331      * @throws IllegalStateException if data was added with a quantity previously
332      */
333     private void checkNoQuantityData()
334     {
335         if (!this.addingByQuantity && this.defaultDataSource != null)
336         {
337             throw new IllegalStateException(
338                     "Adding data with a data stream is not allowed after data has been added with a quantity.");
339         }
340     }
341 
342     /**
343      * Returns a default data stream and checks that no data with a data stream was added.
344      * @param quantity Quantity&lt;?, ?&gt;; quantity
345      * @return DataStream&lt;?&gt;; default data stream
346      * @throws IllegalStateException if data was added with a data stream previously
347      */
348     private DataStream<?> getDefaultDataStream(final Quantity<?, ?> quantity)
349     {
350         Objects.requireNonNull(quantity, "Quantity may not be null.");
351         if (!this.dataSources.isEmpty())
352         {
353             throw new IllegalStateException(
354                     "Adding data with a quantity is not allowed after data has been added with a data stream.");
355         }
356         if (this.defaultDataSource == null)
357         {
358             this.defaultDataSource = new DataSource("default");
359             this.defaultDataStreams = new LinkedHashMap<>();
360         }
361         return this.defaultDataStreams.computeIfAbsent(quantity, (
362                 key
363         ) -> this.defaultDataSource.addStreamSI(quantity, 1.0, 1.0));
364     }
365 
366     /**
367      * Returns data from a specific location as a subset from all data. An empty map is returned if no such data exists.
368      * @param location double; location in [m]
369      * @return data from a specific location
370      */
371     private SortedMap<Double, Map<DataStream<?>, Double>> getSpatialData(final double location)
372     {
373         return this.data.computeIfAbsent(location, (
374                 key
375         ) -> new TreeMap<>());
376     }
377 
378     /**
379      * Returns data from a specific time as a subset of data from a specific location. An empty map is returned if no such data
380      * exists.
381      * @param spatialData Map&lt;Double, Map&lt;DataStream&lt;?&gt;, Double&gt;&gt;; spatially selected data
382      * @param time double; time in [s]
383      * @return data from a specific time, from data from a specific location
384      */
385     private Map<DataStream<?>, Double> getSpacioTemporalData(final Map<Double, Map<DataStream<?>, Double>> spatialData,
386             final double time)
387     {
388         return spatialData.computeIfAbsent(time, (
389                 key
390         ) -> new LinkedHashMap<>());
391     }
392 
393     // **********************
394     // *** KERNEL METHODS ***
395     // **********************
396 
397     /**
398      * Sets a default exponential kernel with infinite range, sigma = 300m, and tau = 30s.
399      */
400     public void setKernel()
401     {
402         setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, new ExpKernelShape(DEFAULT_SIGMA, DEFAULT_TAU));
403     }
404 
405     /**
406      * Sets an exponential kernel with infinite range.
407      * @param sigma double; spatial kernel size
408      * @param tau double; temporal kernel size
409      */
410     public void setKernelSI(final double sigma, final double tau)
411     {
412         setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
413     }
414 
415     /**
416      * Sets an exponential kernel with limited range.
417      * @param sigma double; spatial kernel size in [m]
418      * @param tau double; temporal kernel size in [s]
419      * @param xMax double; maximum spatial range in [m]
420      * @param tMax double; maximum temporal range in [s]
421      */
422     public void setKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
423     {
424         setKernelSI(xMax, tMax, new ExpKernelShape(sigma, tau));
425     }
426 
427     /**
428      * Sets a Gaussian kernel with infinite range.
429      * @param sigma double; spatial kernel size
430      * @param tau double; temporal kernel size
431      */
432     public void setGaussKernelSI(final double sigma, final double tau)
433     {
434         setGaussKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
435     }
436 
437     /**
438      * Sets a Gaussian kernel with limited range.
439      * @param sigma double; spatial kernel size in [m]
440      * @param tau double; temporal kernel size in [s]
441      * @param xMax double; maximum spatial range in [m]
442      * @param tMax double; maximum temporal range in [s]
443      */
444     public void setGaussKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
445     {
446         setKernelSI(xMax, tMax, new GaussKernelShape(sigma, tau));
447     }
448 
449     /**
450      * Sets a kernel with limited range and provided shape. The shape allows using non-exponential kernels.
451      * @param xMax double; maximum spatial range
452      * @param tMax double; maximum temporal range
453      * @param shape KernelShape; shape of the kernel
454      */
455     public synchronized void setKernelSI(final double xMax, final double tMax, final KernelShape shape)
456     {
457         this.kernel = new Kernel(xMax, tMax, shape);
458     }
459 
460     /**
461      * Returns the wave speed in congestion.
462      * @return double; wave speed in congestion
463      */
464     final double getWaveSpeedCongestion()
465     {
466         return this.cCong;
467     }
468 
469     /**
470      * Returns the wave speed in free flow.
471      * @return double; wave speed in free flow
472      */
473     final double getWaveSpeedFreeFlow()
474     {
475         return this.cFree;
476     }
477 
478     // **********************
479     // *** FILTER METHODS ***
480     // **********************
481 
482     /**
483      * Executes filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
484      * done. Finally, the filtering results can then be obtained from the listener.
485      * @param location double[]; location of output grid in [m]
486      * @param time double[]; time of output grid in [s]
487      * @param quantities Quantity&lt;?, ?&gt;...; quantities to calculate filtered data of
488      * @return EgtfParallelListener; listener to notify keep track of the progress
489      */
490     public EgtfParallelListener filterParallelSI(final double[] location, final double[] time,
491             final Quantity<?, ?>... quantities)
492     {
493         Objects.requireNonNull(location, "Location may not be null.");
494         Objects.requireNonNull(time, "Time may not be null.");
495         EgtfParallelListener listener = new EgtfParallelListener();
496         addListener(listener);
497         new Thread(new Runnable()
498         {
499             /** {@inheritDoc} */
500             @Override
501             public void run()
502             {
503                 listener.setFilter(filterSI(location, time, quantities));
504                 removeListener(listener);
505             }
506         }, "Egtf calculation thread").start();
507         return listener;
508     }
509 
510     /**
511      * Executes fast filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
512      * done. Finally, the filtering results can then be obtained from the listener.
513      * @param xMin double; minimum location value of output grid [m]
514      * @param xStep double; location step of output grid [m]
515      * @param xMax double; maximum location value of output grid [m]
516      * @param tMin double; minimum time value of output grid [s]
517      * @param tStep double; time step of output grid [s]
518      * @param tMax double; maximum time value of output grid [s]
519      * @param quantities Quantity&lt;?, ?&gt;...; quantities to calculate filtered data of
520      * @return EgtfParallelListener; listener to notify keep track of the progress
521      */
522     public EgtfParallelListener filterParallelFastSI(final double xMin, final double xStep, final double xMax,
523             final double tMin, final double tStep, final double tMax, final Quantity<?, ?>... quantities)
524     {
525         EgtfParallelListener listener = new EgtfParallelListener();
526         addListener(listener);
527         new Thread(new Runnable()
528         {
529             /** {@inheritDoc} */
530             @Override
531             public void run()
532             {
533                 listener.setFilter(filterFastSI(xMin, xStep, xMax, tMin, tStep, tMax, quantities));
534                 removeListener(listener);
535             }
536         }, "Egtf calculation thread").start();
537         return listener;
538     }
539 
540     /**
541      * Returns filtered data. This is the standard EGTF implementation.
542      * @param location double[]; location of output grid in [m]
543      * @param time double[]; time of output grid in [s]
544      * @param quantities Quantity&lt;?, ?&gt;...; quantities to calculate filtered data of
545      * @return Filter; filtered data, {@code null} when interrupted
546      */
547     @SuppressWarnings({"synthetic-access", "methodlength"})
548     public Filter filterSI(final double[] location, final double[] time, final Quantity<?, ?>... quantities)
549     {
550         Objects.requireNonNull(location, "Location may not be null.");
551         Objects.requireNonNull(time, "Time may not be null.");
552 
553         // initialize data
554         Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
555         for (Quantity<?, ?> quantity : quantities)
556         {
557             map.put(quantity, new double[location.length][time.length]);
558         }
559 
560         // loop grid locations
561         for (int i = 0; i < location.length; i++)
562         {
563             double xGrid = location[i];
564 
565             // filter applicable data for location
566             Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
567                     this.data.subMap(this.kernel.fromLocation(xGrid), true, this.kernel.toLocation(xGrid), true);
568 
569             // loop grid times
570             for (int j = 0; j < time.length; j++)
571             {
572                 double tGrid = time[j];
573 
574                 // notify
575                 if (notifyListeners((i + (double) j / time.length) / location.length))
576                 {
577                     return null;
578                 }
579 
580                 // initialize data per stream
581                 // quantity z assuming congestion and free flow
582                 Map<DataStream<?>, DualWeightedMean> zCongFree = new LinkedHashMap<>();
583 
584                 // filter and loop applicable data for time
585                 for (Map.Entry<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> xEntry : spatialData.entrySet())
586                 {
587                     double dx = xEntry.getKey() - xGrid;
588                     Map<Double, Map<DataStream<?>, Double>> temporalData =
589                             xEntry.getValue().subMap(this.kernel.fromTime(tGrid), true, this.kernel.toTime(tGrid), true);
590 
591                     for (Map.Entry<Double, Map<DataStream<?>, Double>> tEntry : temporalData.entrySet())
592                     {
593                         double dt = tEntry.getKey() - tGrid;
594                         Map<DataStream<?>, Double> pData = tEntry.getValue();
595 
596                         double phiCong = this.kernel.weight(this.cCong, dx, dt);
597                         double phiFree = this.kernel.weight(this.cFree, dx, dt);
598 
599                         // loop streams data at point
600                         for (Map.Entry<DataStream<?>, Double> vEntry : pData.entrySet())
601                         {
602                             DataStream<?> stream = vEntry.getKey();
603                             if (map.containsKey(stream.getQuantity()) || stream.getQuantity().isSpeed())
604                             {
605                                 double v = vEntry.getValue();
606                                 DualWeightedMean zCongFreeOfStream = zCongFree.computeIfAbsent(stream, (
607                                         key
608                                 ) -> new DualWeightedMean());
609                                 zCongFreeOfStream.addCong(v, phiCong);
610                                 zCongFreeOfStream.addFree(v, phiFree);
611                             }
612                         }
613                     }
614                 }
615 
616                 // figure out the congestion level estimated for each data source
617                 Map<DataSource, Double> w = new LinkedHashMap<>();
618                 for (Map.Entry<DataStream<?>, DualWeightedMean> streamEntry : zCongFree.entrySet())
619                 {
620                     DataStream<?> dataStream = streamEntry.getKey();
621                     if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
622                     {
623                         DualWeightedMean zCongFreeOfStream = streamEntry.getValue();
624                         double u = Math.min(zCongFreeOfStream.getCong(), zCongFreeOfStream.getFree());
625                         w.put(dataStream.getDataSource(), // 1 speed quantity per source allowed
626                                 .5 * (1.0 + Math.tanh((Egtf.this.vc - u) / Egtf.this.deltaV)));
627                         continue;
628                     }
629                 }
630 
631                 // sum available data sources per quantity
632                 Double wMean = null;
633                 for (Map.Entry<Quantity<?, ?>, double[][]> qEntry : map.entrySet())
634                 {
635                     Quantity<?, ?> quantity = qEntry.getKey();
636                     WeightedMean z = new WeightedMean();
637                     for (Map.Entry<DataStream<?>, DualWeightedMean> zEntry : zCongFree.entrySet())
638                     {
639                         DataStream<?> dataStream = zEntry.getKey();
640                         if (dataStream.getQuantity().equals(quantity))
641                         {
642                             // obtain congestion level
643                             double wCong;
644                             if (!w.containsKey(dataStream.getDataSource()))
645                             {
646                                 // this data source has no speed data, but congestion level can be estimated from other sources
647                                 if (wMean == null)
648                                 {
649                                     // let's see if speed was estimated already
650                                     for (Quantity<?, ?> prevQuant : quantities)
651                                     {
652                                         if (prevQuant.equals(quantity))
653                                         {
654                                             // it was not, get mean of other data source
655                                             wMean = 0.0;
656                                             for (double ww : w.values())
657                                             {
658                                                 wMean += ww / w.size();
659                                             }
660                                             break;
661                                         }
662                                         else if (prevQuant.isSpeed())
663                                         {
664                                             wMean = .5 * (1.0
665                                                     + Math.tanh((Egtf.this.vc - map.get(prevQuant)[i][j]) / Egtf.this.deltaV));
666                                             break;
667                                         }
668                                     }
669                                 }
670                                 wCong = wMean;
671                             }
672                             else
673                             {
674                                 wCong = w.get(dataStream.getDataSource());
675                             }
676                             // calculate estimated value z of this data source (no duplicate quantities per source allowed)
677                             double wfree = 1.0 - wCong;
678                             DualWeightedMean zCongFreej = zEntry.getValue();
679                             double zStream = wCong * zCongFreej.getCong() + wfree * zCongFreej.getFree();
680                             double weight;
681                             if (w.size() > 1)
682                             {
683                                 // data source more important if more and nearer measurements
684                                 double beta = wCong * zCongFreej.getDenominatorCong() + wfree * zCongFreej.getDenominatorFree();
685                                 // more important if more reliable (smaller standard deviation) at congestion level
686                                 double alpha = wCong / dataStream.getThetaCong() + wfree / dataStream.getThetaFree();
687                                 weight = alpha * beta;
688                             }
689                             else
690                             {
691                                 weight = 1.0;
692                             }
693                             z.add(zStream, weight);
694                         }
695                     }
696                     qEntry.getValue()[i][j] = z.get();
697                 }
698             }
699         }
700         notifyListeners(1.0);
701 
702         return new FilterDouble(location, time, map);
703     }
704 
705     /**
706      * Returns filtered data that is processed using fast fourier transformation. This is much faster than the standard filter,
707      * at the cost that all input data is discretized to the output grid. The gain in computation speed is however such that
708      * finer output grids can be used to alleviate this. For discretization the output grid needs to be equidistant. It is
709      * recommended to set a Kernel with maximum bounds before using this method.
710      * <p>
711      * More than being a fast implementation of the Adaptive Smoothing Method, this implementation includes all data source like
712      * the Extended Generalized Treiber-Helbing Filter.
713      * @param xMin double; minimum location value of output grid [m]
714      * @param xStep double; location step of output grid [m]
715      * @param xMax double; maximum location value of output grid [m]
716      * @param tMin double; minimum time value of output grid [s]
717      * @param tStep double; time step of output grid [s]
718      * @param tMax double; maximum time value of output grid [s]
719      * @param quantities Quantity&lt;?, ?&gt;...; quantities to calculate filtered data of
720      * @return Filter; filtered data, {@code null} when interrupted
721      */
722     @SuppressWarnings("methodlength")
723     public Filter filterFastSI(final double xMin, final double xStep, final double xMax, final double tMin, final double tStep,
724             final double tMax, final Quantity<?, ?>... quantities)
725     {
726         if (xMin > xMax || xStep <= 0.0 || tMin > tMax || tStep <= 0.0)
727         {
728             throw new IllegalArgumentException(
729                     "Ill-defined grid. Make sure that xMax >= xMin, dx > 0, tMax >= tMin and dt > 0");
730         }
731         if (notifyListeners(0.0))
732         {
733             return null;
734         }
735 
736         // initialize data
737         int n = 1 + (int) ((xMax - xMin) / xStep);
738         double[] location = new double[n];
739         IntStream.range(0, n).forEach(i -> location[i] = xMin + i * xStep);
740         n = 1 + (int) ((tMax - tMin) / tStep);
741         double[] time = new double[n];
742         IntStream.range(0, n).forEach(j -> time[j] = tMin + j * tStep);
743         Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
744         Map<Quantity<?, ?>, double[][]> weights = new LinkedHashMap<>();
745         for (Quantity<?, ?> quantity : quantities)
746         {
747             map.put(quantity, new double[location.length][time.length]);
748             weights.put(quantity, new double[location.length][time.length]);
749         }
750 
751         // discretize Kernel
752         double xFrom = this.kernel.fromLocation(0.0);
753         xFrom = Double.isInfinite(xFrom) ? 2.0 * (xMin - xMax) : xFrom;
754         double xTo = this.kernel.toLocation(0.0);
755         xTo = Double.isInfinite(xTo) ? 2.0 * (xMax - xMin) : xTo;
756         double[] dx = equidistant(xFrom, xStep, xTo);
757         double tFrom = this.kernel.fromTime(0.0);
758         tFrom = Double.isInfinite(tFrom) ? 2.0 * (tMin - tMax) : tFrom;
759         double tTo = this.kernel.toTime(0.0);
760         tTo = Double.isInfinite(tTo) ? 2.0 * (tMax - tMin) : tTo;
761         double[] dt = equidistant(tFrom, tStep, tTo);
762         double[][] phiCong = new double[dx.length][dt.length];
763         double[][] phiFree = new double[dx.length][dt.length];
764         for (int i = 0; i < dx.length; i++)
765         {
766             for (int j = 0; j < dt.length; j++)
767             {
768                 phiCong[i][j] = this.kernel.weight(this.cCong, dx[i], dt[j]);
769                 phiFree[i][j] = this.kernel.weight(this.cFree, dx[i], dt[j]);
770             }
771         }
772 
773         // discretize data
774         Map<DataStream<?>, double[][]> dataSum = new LinkedHashMap<>();
775         Map<DataStream<?>, double[][]> dataCount = new LinkedHashMap<>(); // integer counts, must be double[][] for convolution
776         // loop grid locations
777         for (int i = 0; i < location.length; i++)
778         {
779             // filter applicable data for location
780             Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
781                     this.data.subMap(location[i] - 0.5 * xStep, true, location[i] + 0.5 * xStep, true);
782             // loop grid times
783             for (int j = 0; j < time.length; j++)
784             {
785                 // filter and loop applicable data for time
786                 for (NavigableMap<Double, Map<DataStream<?>, Double>> locationData : spatialData.values())
787                 {
788                     NavigableMap<Double, Map<DataStream<?>, Double>> temporalData =
789                             locationData.subMap(time[j] - 0.5 * tStep, true, time[j] + 0.5 * tStep, true);
790                     for (Map<DataStream<?>, Double> timeData : temporalData.values())
791                     {
792                         for (Map.Entry<DataStream<?>, Double> timeEntry : timeData.entrySet())
793                         {
794                             if (map.containsKey(timeEntry.getKey().getQuantity()) || timeEntry.getKey().getQuantity().isSpeed())
795                             {
796                                 dataSum.computeIfAbsent(timeEntry.getKey(), (
797                                         key
798                                 ) -> new double[location.length][time.length])[i][j] += timeEntry.getValue();
799                                 dataCount.computeIfAbsent(timeEntry.getKey(), (
800                                         key
801                                 ) -> new double[location.length][time.length])[i][j]++;
802                             }
803                         }
804                     }
805                 }
806             }
807         }
808 
809         // figure out the congestion level estimated for each data source
810         double steps = quantities.length + 1; // speed (for congestion level) and then all in quantities
811         double step = 0;
812         // store maps to prevent us from calculating the convolution for speed again later
813         Map<DataSource, double[][]> w = new LinkedHashMap<>();
814         Map<DataSource, double[][]> zCongSpeed = new LinkedHashMap<>();
815         Map<DataSource, double[][]> zFreeSpeed = new LinkedHashMap<>();
816         Map<DataSource, double[][]> nCongSpeed = new LinkedHashMap<>();
817         Map<DataSource, double[][]> nFreeSpeed = new LinkedHashMap<>();
818         for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
819         {
820             DataStream<?> dataStream = zEntry.getKey();
821             if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
822             {
823                 // notify
824                 double[][] vCong = Convolution.convolution(phiCong, zEntry.getValue());
825                 if (notifyListeners((step + 0.25) / steps))
826                 {
827                     return null;
828                 }
829                 double[][] vFree = Convolution.convolution(phiFree, zEntry.getValue());
830                 if (notifyListeners((step + 0.5) / steps))
831                 {
832                     return null;
833                 }
834                 double[][] count = dataCount.get(dataStream);
835                 double[][] nCong = Convolution.convolution(phiCong, count);
836                 if (notifyListeners((step + 0.75) / steps))
837                 {
838                     return null;
839                 }
840                 double[][] nFree = Convolution.convolution(phiFree, count);
841                 double[][] wSource = new double[vCong.length][vCong[0].length];
842                 for (int i = 0; i < vCong.length; i++)
843                 {
844                     for (int j = 0; j < vCong[0].length; j++)
845                     {
846                         double u = Math.min(vCong[i][j] / nCong[i][j], vFree[i][j] / nFree[i][j]);
847                         wSource[i][j] = .5 * (1.0 + Math.tanh((Egtf.this.vc - u) / Egtf.this.deltaV));
848                     }
849                 }
850                 w.put(dataStream.getDataSource(), wSource);
851                 zCongSpeed.put(dataStream.getDataSource(), vCong);
852                 zFreeSpeed.put(dataStream.getDataSource(), vFree);
853                 nCongSpeed.put(dataStream.getDataSource(), nCong);
854                 nFreeSpeed.put(dataStream.getDataSource(), nFree);
855             }
856         }
857         step++;
858         if (notifyListeners(step / steps))
859         {
860             return null;
861         }
862 
863         // sum available data sources per quantity
864         double[][] wMean = null;
865         for (Quantity<?, ?> quantity : quantities)
866         {
867             // gather place for this quantity
868             double[][] qData = map.get(quantity);
869             double[][] qWeights = weights.get(quantity);
870             // loop streams that provide this quantity
871             Set<Map.Entry<DataStream<?>, double[][]>> zEntries = new LinkedHashSet<>();
872             for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
873             {
874                 if (zEntry.getKey().getQuantity().equals(quantity))
875                 {
876                     zEntries.add(zEntry);
877                 }
878             }
879             double streamCounter = 0;
880             for (Map.Entry<DataStream<?>, double[][]> zEntry : zEntries)
881             {
882                 DataStream<?> dataStream = zEntry.getKey();
883 
884                 // obtain congestion level
885                 double[][] wj;
886                 if (!w.containsKey(dataStream.getDataSource()))
887                 {
888                     // this data source has no speed data, but congestion level can be estimated from other sources
889                     if (wMean == null)
890                     {
891                         // let's see if speed was estimated already
892                         for (Quantity<?, ?> prevQuant : quantities)
893                         {
894                             if (prevQuant.equals(quantity))
895                             {
896                                 // it was not, get mean of other data source
897                                 wMean = new double[location.length][time.length];
898                                 for (double[][] ww : w.values())
899                                 {
900                                     for (int i = 0; i < location.length; i++)
901                                     {
902                                         for (int j = 0; j < time.length; j++)
903                                         {
904                                             wMean[i][j] += ww[i][j] / w.size();
905                                         }
906                                     }
907                                 }
908                                 break;
909                             }
910                             else if (prevQuant.isSpeed())
911                             {
912                                 wMean = new double[location.length][time.length];
913                                 double[][] v = map.get(prevQuant);
914                                 for (int i = 0; i < location.length; i++)
915                                 {
916                                     for (int j = 0; j < time.length; j++)
917                                     {
918                                         wMean[i][j] = .5 * (1.0 + Math.tanh((Egtf.this.vc - v[i][j]) / Egtf.this.deltaV));
919                                     }
920                                 }
921                                 break;
922                             }
923                         }
924                     }
925                     wj = wMean;
926                 }
927                 else
928                 {
929                     wj = w.get(dataStream.getDataSource());
930                 }
931 
932                 // convolutions of filters with discretized data and data counts
933                 double[][] zCong;
934                 double[][] zFree;
935                 double[][] nCong;
936                 double[][] nFree;
937                 if (dataStream.getQuantity().isSpeed())
938                 {
939                     zCong = zCongSpeed.get(dataStream.getDataSource());
940                     zFree = zFreeSpeed.get(dataStream.getDataSource());
941                     nCong = nCongSpeed.get(dataStream.getDataSource());
942                     nFree = nFreeSpeed.get(dataStream.getDataSource());
943                 }
944                 else
945                 {
946                     zCong = Convolution.convolution(phiCong, zEntry.getValue());
947                     if (notifyListeners((step + (streamCounter + 0.25) / zEntries.size()) / steps))
948                     {
949                         return null;
950                     }
951                     zFree = Convolution.convolution(phiFree, zEntry.getValue());
952                     if (notifyListeners((step + (streamCounter + 0.5) / zEntries.size()) / steps))
953                     {
954                         return null;
955                     }
956                     double[][] count = dataCount.get(dataStream);
957                     nCong = Convolution.convolution(phiCong, count);
958                     if (notifyListeners((step + (streamCounter + 0.75) / zEntries.size()) / steps))
959                     {
960                         return null;
961                     }
962                     nFree = Convolution.convolution(phiFree, count);
963                 }
964 
965                 // loop grid to add to each weighted sum (weighted per data source)
966                 for (int i = 0; i < location.length; i++)
967                 {
968                     for (int j = 0; j < time.length; j++)
969                     {
970                         double wCong = wj[i][j];
971                         double wFree = 1.0 - wCong;
972                         double value = wCong * zCong[i][j] / nCong[i][j] + wFree * zFree[i][j] / nFree[i][j];
973                         // the fast filter supplies convoluted data counts, i.e. amount of data and filter proximity; this
974                         // is exactly what the EGTF method needs to weigh data sources
975                         double beta = wCong * nCong[i][j] + wFree * nFree[i][j];
976                         double alpha = wCong / dataStream.getThetaCong() + wFree / dataStream.getThetaFree();
977                         double weight = beta * alpha;
978                         qData[i][j] += (value * weight);
979                         qWeights[i][j] += weight;
980                     }
981                 }
982                 streamCounter++;
983                 if (notifyListeners((step + streamCounter / zEntries.size()) / steps))
984                 {
985                     return null;
986                 }
987             }
988             for (int i = 0; i < location.length; i++)
989             {
990                 for (int j = 0; j < time.length; j++)
991                 {
992                     qData[i][j] /= qWeights[i][j];
993                 }
994             }
995             step++;
996         }
997 
998         return new FilterDouble(location, time, map);
999     }
1000 
1001     /**
1002      * Returns an equidistant vector that includes 0.
1003      * @param from double; lowest value to include
1004      * @param step double; step
1005      * @param to double; highest value to include
1006      * @return double[]; equidistant vector that includes 0
1007      */
1008     private double[] equidistant(final double from, final double step, final double to)
1009     {
1010         int n1 = (int) (-from / step);
1011         int n2 = (int) (to / step);
1012         int n = n1 + n2 + 1;
1013         double[] array = new double[n];
1014         for (int i = 0; i < n; i++)
1015         {
1016             array[i] = i < n1 ? step * (-n1 + i) : step * (i - n1);
1017         }
1018         return array;
1019     }
1020 
1021     // *********************
1022     // *** EVENT METHODS ***
1023     // *********************
1024 
1025     /**
1026      * Interrupt the calculation.
1027      */
1028     public final void interrupt()
1029     {
1030         this.interrupted = true;
1031     }
1032 
1033     /**
1034      * Add listener.
1035      * @param listener EgtfListener; listener
1036      */
1037     public final void addListener(final EgtfListener listener)
1038     {
1039         this.listeners.add(listener);
1040     }
1041 
1042     /**
1043      * Remove listener.
1044      * @param listener EgtfListener; listener
1045      */
1046     public final void removeListener(final EgtfListener listener)
1047     {
1048         this.listeners.remove(listener);
1049     }
1050 
1051     /**
1052      * Notify all listeners.
1053      * @param progress double; progress, a value in the range [0 ... 1]
1054      * @return boolean; whether the filter is interrupted
1055      */
1056     private boolean notifyListeners(final double progress)
1057     {
1058         if (!this.listeners.isEmpty())
1059         {
1060             EgtfEvent event = new EgtfEvent(this, progress);
1061             for (EgtfListener listener : this.listeners)
1062             {
1063                 listener.notifyProgress(event);
1064             }
1065         }
1066         return this.interrupted;
1067     }
1068 
1069     // **********************
1070     // *** HELPER CLASSES ***
1071     // **********************
1072 
1073     /**
1074      * Small class to build up a weighted mean under the congestion and free flow assumption.
1075      */
1076     private class DualWeightedMean
1077     {
1078         /** Cumulative congestion numerator of weighted mean fraction, i.e. weighted sum. */
1079         private double numeratorCong;
1080 
1081         /** Cumulative free flow numerator of weighted mean fraction, i.e. weighted sum. */
1082         private double numeratorFree;
1083 
1084         /** Cumulative congestion denominator of weighted mean fraction, i.e. sum of weights. */
1085         private double denominatorCong;
1086 
1087         /** Cumulative free flow denominator of weighted mean fraction, i.e. sum of weights. */
1088         private double denominatorFree;
1089 
1090         /**
1091          * Adds a congestion value with weight.
1092          * @param value double; value
1093          * @param weight double; weight
1094          */
1095         public void addCong(final double value, final double weight)
1096         {
1097             this.numeratorCong += value * weight;
1098             this.denominatorCong += weight;
1099         }
1100 
1101         /**
1102          * Adds a free flow value with weight.
1103          * @param value double; value
1104          * @param weight double; weight
1105          */
1106         public void addFree(final double value, final double weight)
1107         {
1108             this.numeratorFree += value * weight;
1109             this.denominatorFree += weight;
1110         }
1111 
1112         /**
1113          * Returns the weighted congestion mean of available data.
1114          * @return double; weighted mean of available data
1115          */
1116         public double getCong()
1117         {
1118             return this.numeratorCong / this.denominatorCong;
1119         }
1120 
1121         /**
1122          * Returns the weighted free flow mean of available data.
1123          * @return double; weighted free flow mean of available data
1124          */
1125         public double getFree()
1126         {
1127             return this.numeratorFree / this.denominatorFree;
1128         }
1129 
1130         /**
1131          * Returns the sum of congestion weights.
1132          * @return double; the sum of congestion weights
1133          */
1134         public double getDenominatorCong()
1135         {
1136             return this.denominatorCong;
1137         }
1138 
1139         /**
1140          * Returns the sum of free flow weights.
1141          * @return double; the sum of free flow weights
1142          */
1143         public double getDenominatorFree()
1144         {
1145             return this.denominatorFree;
1146         }
1147 
1148         /** {@inheritDoc} */
1149         @Override
1150         public String toString()
1151         {
1152             return "DualWeightedMean [numeratorCong=" + this.numeratorCong + ", numeratorFree=" + this.numeratorFree
1153                     + ", denominatorCong=" + this.denominatorCong + ", denominatorFree=" + this.denominatorFree + "]";
1154         }
1155 
1156     }
1157 
1158     /**
1159      * Small class to build up a weighted mean.
1160      */
1161     private class WeightedMean
1162     {
1163         /** Cumulative numerator of weighted mean fraction, i.e. weighted sum. */
1164         private double numerator;
1165 
1166         /** Cumulative denominator of weighted mean fraction, i.e. sum of weights. */
1167         private double denominator;
1168 
1169         /**
1170          * Adds a value with weight.
1171          * @param value double; value
1172          * @param weight double; weight
1173          */
1174         public void add(final double value, final double weight)
1175         {
1176             this.numerator += value * weight;
1177             this.denominator += weight;
1178         }
1179 
1180         /**
1181          * Returns the weighted mean of available data.
1182          * @return double; weighted mean of available data
1183          */
1184         public double get()
1185         {
1186             return this.numerator / this.denominator;
1187         }
1188 
1189         /** {@inheritDoc} */
1190         @Override
1191         public String toString()
1192         {
1193             return "WeightedMean [numerator=" + this.numerator + ", denominator=" + this.denominator + "]";
1194         }
1195 
1196     }
1197 
1198     /** {@inheritDoc} */
1199     @Override
1200     public String toString()
1201     {
1202         return "EGTF [kernel=" + this.kernel + ", cCong=" + this.cCong + ", cFree=" + this.cFree + ", deltaV=" + this.deltaV
1203                 + ", vc=" + this.vc + ", dataSources=" + this.dataSources + ", data=" + this.data + ", interrupted="
1204                 + this.interrupted + ", listeners=" + this.listeners + "]";
1205     }
1206 
1207 }