1 package org.opentrafficsim.draw.egtf;
2
3 import java.util.LinkedHashMap;
4 import java.util.LinkedHashSet;
5 import java.util.Map;
6 import java.util.NavigableMap;
7 import java.util.Objects;
8 import java.util.Set;
9 import java.util.SortedMap;
10 import java.util.TreeMap;
11 import java.util.stream.IntStream;
12
13 /**
14 * Extended Generalized Treiber-Helbing Filter (van Lint and Hoogendoorn, 2009). This is an extension of the Adaptive Smoothing
15 * Method (Treiber and Helbing, 2002). A fast filter for equidistant grids (Schreiter et al., 2010) is available. This fast
16 * implementation also supports multiple data sources.
17 * <p>
18 * To allow flexible usage the EGTF works with {@code DataSource}, {@code DataStream} and {@code Quantity}.
19 * <p>
20 * A {@code DataSource}, such as "loop detectors", "floating-car data" or "camera" is mostly an identifier, but can be requested
21 * to provide several data streams.
22 * <p>
23 * A {@code DataStream} is one {@code DataSource} supplying one {@code Quantity}. For instance "loop detectors" supplying
24 * "flow". In a {@code DataStream}, supplied by the {@code DataSource}, standard deviation of measurements in congestion and
25 * free flow are defined. These determine the reliability of the {@code Quantity} data from the given {@code DataSource}, and
26 * thus ultimately the weight of the data in the estimation of the quantity.
27 * <p>
28 * A {@code Quantity}, such as "flow" or "density" defines what is measured and what is requested as output. The output can be
29 * in typed format using a {@code Converter}. Default quantities are available under {@code SPEED_SI}, {@code FLOW_SI} and
30 * {@code DENSITY_SI}, all under {@code Quantity}.
31 * <p>
32 * Data can be added using several methods for point data, vector data (multiple independent location-time combinations) and
33 * grid data (data in a grid pattern). Data is added for a particular {@code DataStream}.
34 * <p>
35 * For simple use-cases where a single data source is used, data can be added directly with a {@code Quantity}, in which case a
36 * default {@code DataSource}, and default {@code DataStream} for each {@code Quantity} is internally used. All data should
37 * either be added using {@code Quantity}'s, or it should all be added using {@code DataSource}'s. Otherwise relative data
38 * reliability is undefined.
39 * <p>
40 * Output can be requested from the EGTF using a {@code Kernel}, a spatiotemporal pattern determining measurement weights. The
41 * {@code Kernel} defines an optional maximum spatial and temporal range for measurements to consider, and uses a {@code Shape}
42 * to determine the weight for a given distance and time from the estimated point. By default this is an exponential function. A
43 * Gaussian kernel is also available, while any other shape could be also be implemented.
44 * <p>
45 * Parameters from the EGTF are found in the following places:
46 * <ul>
47 * <li>{@code EGTF}: <i>cCong</i>, <i>cFree</i>, <i>deltaV</i> and <i>vc</i>, defining the overall traffic flow properties.</li>
48 * <li>{@code Kernel}: <i>tMax</i> and <i>xMax</i>, defining the maximum range to consider.</li>
49 * <li>{@code KernelShape}: <i>sigma</i> and <i>tau</i>, determining the decay of weights for further measurements in space and
50 * time. (Specifically {@code GaussKernelShape})</li>
51 * <li>{@code DataStream}: <i>thetaCong</i> and <i>thetaFree</i>, defining the reliability by the standard deviation of measured
52 * data in free flow and congestion from a particular data stream.</li>
53 * </ul>
54 * References:
55 * <ul>
56 * <li>van Lint, J. W. C. and Hoogendoorn, S. P. (2009). A robust and efficient method for fusing heterogeneous data from
57 * traffic sensors on freeways. Computer Aided Civil and Infrastructure Engineering, accepted for publication.</li>
58 * <li>Schreiter, T., van Lint, J. W. C., Treiber, M. and Hoogendoorn, S. P. (2010). Two fast implementations of the Adaptive
59 * Smoothing Method used in highway traffic state estimation. 13th International IEEE Conference on Intelligent Transportation
60 * Systems, 19-22 Sept. 2010, Funchal, Portugal.</li>
61 * <li>Treiber, M. and Helbing, D. (2002). Reconstructing the spatio-temporal traffic dynamics from stationary detector data.
62 * Cooper@tive Tr@nsport@tion Dyn@mics, 1:3.1–3.24.</li>
63 * </ul>
64 * <p>
65 * Copyright (c) 2013-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
66 * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
67 * </p>
68 * @author <a href="https://github.com/wjschakel">Wouter Schakel</a>
69 */
70 public class Egtf
71 {
72
73 /** Default sigma value. */
74 private static final double DEFAULT_SIGMA = 300.0;
75
76 /** Default tau value. */
77 private static final double DEFAULT_TAU = 30.0;
78
79 /** Filter kernel. */
80 private Kernel kernel;
81
82 /** Shock wave speed in congestion. */
83 private final double cCong;
84
85 /** Shock wave speed in free flow. */
86 private final double cFree;
87
88 /** Speed range between congestion and free flow. */
89 private final double deltaV;
90
91 /** Flip-over speed below which we have congestion. */
92 private final double vc;
93
94 /** Data sources by label so we can return the same instances upon repeated request. */
95 private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
96
97 /** Default data source for cases where a single data source is used. */
98 private DataSource defaultDataSource = null;
99
100 /** Default data streams for cases where a single data source is used. */
101 private Map<Quantity<?, ?>, DataStream<?>> defaultDataStreams = null;
102
103 /** True if data is currently being added using a quantity, in which case a check should not occur. */
104 private boolean addingByQuantity;
105
106 /** All point data sorted by space and time, and per data stream. */
107 private NavigableMap<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> data = new TreeMap<>();
108
109 /** Whether the calculation was interrupted. */
110 private boolean interrupted = false;
111
112 /** Listeners. */
113 private Set<EgtfListener> listeners = new LinkedHashSet<>();
114
115 /**
116 * Constructor using cCong = -18km/h, cFree = 80km/h, deltaV = 10km/h and vc = 80km/h. A default kernel is set.
117 */
118 public Egtf()
119 {
120 this(-18.0, 80.0, 10.0, 80.0);
121 }
122
123 /**
124 * Constructor defining global settings. A default kernel is set.
125 * @param cCong shock wave speed in congestion [km/h]
126 * @param cFree shock wave speed in free flow [km/h]
127 * @param deltaV speed range between congestion and free flow [km/h]
128 * @param vc flip-over speed below which we have congestion [km/h]
129 */
130 public Egtf(final double cCong, final double cFree, final double deltaV, final double vc)
131 {
132 this.cCong = cCong / 3.6;
133 this.cFree = cFree / 3.6;
134 this.deltaV = deltaV / 3.6;
135 this.vc = vc / 3.6;
136 setKernel();
137 }
138
139 /**
140 * Convenience constructor that also sets a specified kernel.
141 * @param cCong shock wave speed in congestion [km/h]
142 * @param cFree shock wave speed in free flow [km/h]
143 * @param deltaV speed range between congestion and free flow [km/h]
144 * @param vc flip-over speed below which we have congestion [km/h]
145 * @param sigma spatial kernel size in [m]
146 * @param tau temporal kernel size in [s]
147 * @param xMax maximum spatial range in [m]
148 * @param tMax maximum temporal range in [s]
149 */
150 @SuppressWarnings("parameternumber")
151 public Egtf(final double cCong, final double cFree, final double deltaV, final double vc, final double sigma,
152 final double tau, final double xMax, final double tMax)
153 {
154 this(cCong, cFree, deltaV, vc);
155 setKernelSI(sigma, tau, xMax, tMax);
156 }
157
158 // ********************
159 // *** DATA METHODS ***
160 // ********************
161
162 /**
163 * Return a data source, which is created if necessary.
164 * @param name unique name for the data source
165 * @return data source
166 * @throws IllegalStateException when data has been added without a data source
167 */
168 public DataSource getDataSource(final String name)
169 {
170 if (this.defaultDataSource != null)
171 {
172 throw new IllegalStateException(
173 "Obtaining a (new) data source after data has been added without a data source is not allowed.");
174 }
175 return this.dataSources.computeIfAbsent(name, (key) -> new DataSource(key));
176 }
177
178 /**
179 * Removes all data from before the given time. This is useful in live usages of this class, where older data is no longer
180 * required.
181 * @param time time before which all data can be removed
182 */
183 public synchronized void clearDataBefore(final double time)
184 {
185 for (SortedMap<Double, Map<DataStream<?>, Double>> map : this.data.values())
186 {
187 map.subMap(Double.NEGATIVE_INFINITY, time).clear();
188 }
189 }
190
191 /**
192 * Adds point data.
193 * @param quantity quantity of the data
194 * @param location location in [m]
195 * @param time time in [s]
196 * @param value data value
197 * @throws IllegalStateException if data was added with a data stream previously
198 */
199 public synchronized void addPointDataSI(final Quantity<?, ?> quantity, final double location, final double time,
200 final double value)
201 {
202 this.addingByQuantity = true;
203 addPointDataSI(getDefaultDataStream(quantity), location, time, value);
204 this.addingByQuantity = false;
205 }
206
207 /**
208 * Adds point data.
209 * @param dataStream data stream of the data
210 * @param location location in [m]
211 * @param time time in [s]
212 * @param value data value
213 * @throws IllegalStateException if data was added with a quantity previously
214 */
215 public synchronized void addPointDataSI(final DataStream<?> dataStream, final double location, final double time,
216 final double value)
217 {
218 checkNoQuantityData();
219 Objects.requireNonNull(dataStream, "Datastream may not be null.");
220 if (!Double.isNaN(value))
221 {
222 getSpacioTemporalData(getSpatialData(location), time).put(dataStream, value);
223 }
224 }
225
226 /**
227 * Adds vector data.
228 * @param quantity quantity of the data
229 * @param location locations in [m]
230 * @param time times in [s]
231 * @param values data values in SI unit
232 * @throws IllegalStateException if data was added with a data stream previously
233 */
234 public synchronized void addVectorDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
235 final double[] values)
236 {
237 this.addingByQuantity = true;
238 addVectorDataSI(getDefaultDataStream(quantity), location, time, values);
239 this.addingByQuantity = false;
240 }
241
242 /**
243 * Adds vector data.
244 * @param dataStream data stream of the data
245 * @param location locations in [m]
246 * @param time times in [s]
247 * @param values data values in SI unit
248 * @throws IllegalStateException if data was added with a quantity previously
249 */
250 public synchronized void addVectorDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
251 final double[] values)
252 {
253 checkNoQuantityData();
254 Objects.requireNonNull(dataStream, "Datastream may not be null.");
255 Objects.requireNonNull(location, "Location may not be null.");
256 Objects.requireNonNull(time, "Time may not be null.");
257 Objects.requireNonNull(values, "Values may not be null.");
258 if (location.length != time.length || time.length != values.length)
259 {
260 throw new IllegalArgumentException(String.format("Unequal lengths: location %d, time %d, data %d.", location.length,
261 time.length, values.length));
262 }
263 for (int i = 0; i < values.length; i++)
264 {
265 if (!Double.isNaN(values[i]))
266 {
267 getSpacioTemporalData(getSpatialData(location[i]), time[i]).put(dataStream, values[i]);
268 }
269 }
270 }
271
272 /**
273 * Adds grid data.
274 * @param quantity quantity of the data
275 * @param location locations in [m]
276 * @param time times in [s]
277 * @param values data values in SI unit
278 * @throws IllegalStateException if data was added with a data stream previously
279 */
280 public synchronized void addGridDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
281 final double[][] values)
282 {
283 this.addingByQuantity = true;
284 addGridDataSI(getDefaultDataStream(quantity), location, time, values);
285 this.addingByQuantity = false;
286 }
287
288 /**
289 * Adds grid data.
290 * @param dataStream data stream of the data
291 * @param location locations in [m]
292 * @param time times in [s]
293 * @param values data values in SI unit
294 * @throws IllegalStateException if data was added with a quantity previously
295 */
296 public synchronized void addGridDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
297 final double[][] values)
298 {
299 checkNoQuantityData();
300 Objects.requireNonNull(dataStream, "Datastream may not be null.");
301 Objects.requireNonNull(location, "Location may not be null.");
302 Objects.requireNonNull(time, "Time may not be null.");
303 Objects.requireNonNull(values, "Values may not be null.");
304 if (values.length != location.length)
305 {
306 throw new IllegalArgumentException(
307 String.format("%d locations while length of data is %d", location.length, values.length));
308 }
309 for (int i = 0; i < location.length; i++)
310 {
311 if (values[i].length != time.length)
312 {
313 throw new IllegalArgumentException(
314 String.format("%d times while length of data is %d", time.length, values[i].length));
315 }
316 Map<Double, Map<DataStream<?>, Double>> spatialData = getSpatialData(location[i]);
317 for (int j = 0; j < time.length; j++)
318 {
319 if (!Double.isNaN(values[i][j]))
320 {
321 getSpacioTemporalData(spatialData, time[j]).put(dataStream, values[i][j]);
322 }
323 }
324 }
325 }
326
327 /**
328 * Check that no data was added using a quantity.
329 * @throws IllegalStateException if data was added with a quantity previously
330 */
331 private void checkNoQuantityData()
332 {
333 if (!this.addingByQuantity && this.defaultDataSource != null)
334 {
335 throw new IllegalStateException(
336 "Adding data with a data stream is not allowed after data has been added with a quantity.");
337 }
338 }
339
340 /**
341 * Returns a default data stream and checks that no data with a data stream was added.
342 * @param quantity quantity
343 * @return default data stream
344 * @throws IllegalStateException if data was added with a data stream previously
345 */
346 private DataStream<?> getDefaultDataStream(final Quantity<?, ?> quantity)
347 {
348 Objects.requireNonNull(quantity, "Quantity may not be null.");
349 if (!this.dataSources.isEmpty())
350 {
351 throw new IllegalStateException(
352 "Adding data with a quantity is not allowed after data has been added with a data stream.");
353 }
354 if (this.defaultDataSource == null)
355 {
356 this.defaultDataSource = new DataSource("default");
357 this.defaultDataStreams = new LinkedHashMap<>();
358 }
359 return this.defaultDataStreams.computeIfAbsent(quantity,
360 (key) -> this.defaultDataSource.addStreamSI(quantity, 1.0, 1.0));
361 }
362
363 /**
364 * Returns data from a specific location as a subset from all data. An empty map is returned if no such data exists.
365 * @param location location in [m]
366 * @return data from a specific location
367 */
368 private SortedMap<Double, Map<DataStream<?>, Double>> getSpatialData(final double location)
369 {
370 return this.data.computeIfAbsent(location, (key) -> new TreeMap<>());
371 }
372
373 /**
374 * Returns data from a specific time as a subset of data from a specific location. An empty map is returned if no such data
375 * exists.
376 * @param spatialData spatially selected data
377 * @param time time in [s]
378 * @return data from a specific time, from data from a specific location
379 */
380 private Map<DataStream<?>, Double> getSpacioTemporalData(final Map<Double, Map<DataStream<?>, Double>> spatialData,
381 final double time)
382 {
383 return spatialData.computeIfAbsent(time, (key) -> new LinkedHashMap<>());
384 }
385
386 // **********************
387 // *** KERNEL METHODS ***
388 // **********************
389
390 /**
391 * Sets a default exponential kernel with infinite range, sigma = 300m, and tau = 30s.
392 */
393 public void setKernel()
394 {
395 setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, new ExpKernelShape(DEFAULT_SIGMA, DEFAULT_TAU));
396 }
397
398 /**
399 * Sets an exponential kernel with infinite range.
400 * @param sigma spatial kernel size
401 * @param tau temporal kernel size
402 */
403 public void setKernelSI(final double sigma, final double tau)
404 {
405 setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
406 }
407
408 /**
409 * Sets an exponential kernel with limited range.
410 * @param sigma spatial kernel size in [m]
411 * @param tau temporal kernel size in [s]
412 * @param xMax maximum spatial range in [m]
413 * @param tMax maximum temporal range in [s]
414 */
415 public void setKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
416 {
417 setKernelSI(xMax, tMax, new ExpKernelShape(sigma, tau));
418 }
419
420 /**
421 * Sets a Gaussian kernel with infinite range.
422 * @param sigma spatial kernel size
423 * @param tau temporal kernel size
424 */
425 public void setGaussKernelSI(final double sigma, final double tau)
426 {
427 setGaussKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
428 }
429
430 /**
431 * Sets a Gaussian kernel with limited range.
432 * @param sigma spatial kernel size in [m]
433 * @param tau temporal kernel size in [s]
434 * @param xMax maximum spatial range in [m]
435 * @param tMax maximum temporal range in [s]
436 */
437 public void setGaussKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
438 {
439 setKernelSI(xMax, tMax, new GaussKernelShape(sigma, tau));
440 }
441
442 /**
443 * Sets a kernel with limited range and provided shape. The shape allows using non-exponential kernels.
444 * @param xMax maximum spatial range
445 * @param tMax maximum temporal range
446 * @param shape shape of the kernel
447 */
448 public synchronized void setKernelSI(final double xMax, final double tMax, final KernelShape shape)
449 {
450 this.kernel = new Kernel(xMax, tMax, shape);
451 }
452
453 /**
454 * Returns the wave speed in congestion.
455 * @return wave speed in congestion
456 */
457 final double getWaveSpeedCongestion()
458 {
459 return this.cCong;
460 }
461
462 /**
463 * Returns the wave speed in free flow.
464 * @return wave speed in free flow
465 */
466 final double getWaveSpeedFreeFlow()
467 {
468 return this.cFree;
469 }
470
471 // **********************
472 // *** FILTER METHODS ***
473 // **********************
474
475 /**
476 * Executes filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
477 * done. Finally, the filtering results can then be obtained from the listener.
478 * @param location location of output grid in [m]
479 * @param time time of output grid in [s]
480 * @param quantities quantities to calculate filtered data of
481 * @return listener to notify keep track of the progress
482 */
483 public EgtfParallelListener filterParallelSI(final double[] location, final double[] time,
484 final Quantity<?, ?>... quantities)
485 {
486 Objects.requireNonNull(location, "Location may not be null.");
487 Objects.requireNonNull(time, "Time may not be null.");
488 EgtfParallelListener listener = new EgtfParallelListener();
489 addListener(listener);
490 new Thread(new Runnable()
491 {
492 @Override
493 public void run()
494 {
495 listener.setFilter(filterSI(location, time, quantities));
496 removeListener(listener);
497 }
498 }, "Egtf calculation thread").start();
499 return listener;
500 }
501
502 /**
503 * Executes fast filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
504 * done. Finally, the filtering results can then be obtained from the listener.
505 * @param xMin minimum location value of output grid [m]
506 * @param xStep location step of output grid [m]
507 * @param xMax maximum location value of output grid [m]
508 * @param tMin minimum time value of output grid [s]
509 * @param tStep time step of output grid [s]
510 * @param tMax maximum time value of output grid [s]
511 * @param quantities quantities to calculate filtered data of
512 * @return listener to notify keep track of the progress
513 */
514 public EgtfParallelListener filterParallelFastSI(final double xMin, final double xStep, final double xMax,
515 final double tMin, final double tStep, final double tMax, final Quantity<?, ?>... quantities)
516 {
517 EgtfParallelListener listener = new EgtfParallelListener();
518 addListener(listener);
519 new Thread(new Runnable()
520 {
521 @Override
522 public void run()
523 {
524 listener.setFilter(filterFastSI(xMin, xStep, xMax, tMin, tStep, tMax, quantities));
525 removeListener(listener);
526 }
527 }, "Egtf calculation thread").start();
528 return listener;
529 }
530
531 /**
532 * Returns filtered data. This is the standard EGTF implementation.
533 * @param location location of output grid in [m]
534 * @param time time of output grid in [s]
535 * @param quantities quantities to calculate filtered data of
536 * @return filtered data, {@code null} when interrupted
537 */
538 @SuppressWarnings("methodlength")
539 public Filter filterSI(final double[] location, final double[] time, final Quantity<?, ?>... quantities)
540 {
541 Objects.requireNonNull(location, "Location may not be null.");
542 Objects.requireNonNull(time, "Time may not be null.");
543
544 // initialize data
545 Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
546 for (Quantity<?, ?> quantity : quantities)
547 {
548 map.put(quantity, new double[location.length][time.length]);
549 }
550
551 // loop grid locations
552 for (int i = 0; i < location.length; i++)
553 {
554 double xGrid = location[i];
555
556 // filter applicable data for location
557 Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
558 this.data.subMap(this.kernel.fromLocation(xGrid), true, this.kernel.toLocation(xGrid), true);
559
560 // loop grid times
561 for (int j = 0; j < time.length; j++)
562 {
563 double tGrid = time[j];
564
565 // notify
566 if (notifyListeners((i + (double) j / time.length) / location.length))
567 {
568 return null;
569 }
570
571 // initialize data per stream
572 // quantity z assuming congestion and free flow
573 Map<DataStream<?>, DualWeightedMean> zCongFree = new LinkedHashMap<>();
574
575 // filter and loop applicable data for time
576 for (Map.Entry<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> xEntry : spatialData.entrySet())
577 {
578 double dx = xEntry.getKey() - xGrid;
579 Map<Double, Map<DataStream<?>, Double>> temporalData =
580 xEntry.getValue().subMap(this.kernel.fromTime(tGrid), true, this.kernel.toTime(tGrid), true);
581
582 for (Map.Entry<Double, Map<DataStream<?>, Double>> tEntry : temporalData.entrySet())
583 {
584 double dt = tEntry.getKey() - tGrid;
585 Map<DataStream<?>, Double> pData = tEntry.getValue();
586
587 double phiCong = this.kernel.weight(this.cCong, dx, dt);
588 double phiFree = this.kernel.weight(this.cFree, dx, dt);
589
590 // loop streams data at point
591 for (Map.Entry<DataStream<?>, Double> vEntry : pData.entrySet())
592 {
593 DataStream<?> stream = vEntry.getKey();
594 if (map.containsKey(stream.getQuantity()) || stream.getQuantity().isSpeed())
595 {
596 double v = vEntry.getValue();
597 DualWeightedMean zCongFreeOfStream =
598 zCongFree.computeIfAbsent(stream, (key) -> new DualWeightedMean());
599 zCongFreeOfStream.addCong(v, phiCong);
600 zCongFreeOfStream.addFree(v, phiFree);
601 }
602 }
603 }
604 }
605
606 // figure out the congestion level estimated for each data source
607 Map<DataSource, Double> w = new LinkedHashMap<>();
608 for (Map.Entry<DataStream<?>, DualWeightedMean> streamEntry : zCongFree.entrySet())
609 {
610 DataStream<?> dataStream = streamEntry.getKey();
611 if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
612 {
613 DualWeightedMean zCongFreeOfStream = streamEntry.getValue();
614 double u = Math.min(zCongFreeOfStream.getCong(), zCongFreeOfStream.getFree());
615 w.put(dataStream.getDataSource(), // 1 speed quantity per source allowed
616 .5 * (1.0 + Math.tanh((Egtf.this.vc - u) / Egtf.this.deltaV)));
617 continue;
618 }
619 }
620
621 // sum available data sources per quantity
622 Double wMean = null;
623 for (Map.Entry<Quantity<?, ?>, double[][]> qEntry : map.entrySet())
624 {
625 Quantity<?, ?> quantity = qEntry.getKey();
626 WeightedMean z = new WeightedMean();
627 for (Map.Entry<DataStream<?>, DualWeightedMean> zEntry : zCongFree.entrySet())
628 {
629 DataStream<?> dataStream = zEntry.getKey();
630 if (dataStream.getQuantity().equals(quantity))
631 {
632 // obtain congestion level
633 double wCong;
634 if (!w.containsKey(dataStream.getDataSource()))
635 {
636 // this data source has no speed data, but congestion level can be estimated from other sources
637 if (wMean == null)
638 {
639 // let's see if speed was estimated already
640 for (Quantity<?, ?> prevQuant : quantities)
641 {
642 if (prevQuant.equals(quantity))
643 {
644 // it was not, get mean of other data source
645 wMean = 0.0;
646 for (double ww : w.values())
647 {
648 wMean += ww / w.size();
649 }
650 break;
651 }
652 else if (prevQuant.isSpeed())
653 {
654 wMean = .5 * (1.0
655 + Math.tanh((Egtf.this.vc - map.get(prevQuant)[i][j]) / Egtf.this.deltaV));
656 break;
657 }
658 }
659 }
660 wCong = wMean;
661 }
662 else
663 {
664 wCong = w.get(dataStream.getDataSource());
665 }
666 // calculate estimated value z of this data source (no duplicate quantities per source allowed)
667 double wfree = 1.0 - wCong;
668 DualWeightedMean zCongFreej = zEntry.getValue();
669 double zStream = wCong * zCongFreej.getCong() + wfree * zCongFreej.getFree();
670 double weight;
671 if (w.size() > 1)
672 {
673 // data source more important if more and nearer measurements
674 double beta = wCong * zCongFreej.getDenominatorCong() + wfree * zCongFreej.getDenominatorFree();
675 // more important if more reliable (smaller standard deviation) at congestion level
676 double alpha = wCong / dataStream.getThetaCong() + wfree / dataStream.getThetaFree();
677 weight = alpha * beta;
678 }
679 else
680 {
681 weight = 1.0;
682 }
683 z.add(zStream, weight);
684 }
685 }
686 qEntry.getValue()[i][j] = z.get();
687 }
688 }
689 }
690 notifyListeners(1.0);
691
692 return new FilterDouble(location, time, map);
693 }
694
695 /**
696 * Returns filtered data that is processed using fast fourier transformation. This is much faster than the standard filter,
697 * at the cost that all input data is discretized to the output grid. The gain in computation speed is however such that
698 * finer output grids can be used to alleviate this. For discretization the output grid needs to be equidistant. It is
699 * recommended to set a Kernel with maximum bounds before using this method.
700 * <p>
701 * More than being a fast implementation of the Adaptive Smoothing Method, this implementation includes all data source like
702 * the Extended Generalized Treiber-Helbing Filter.
703 * @param xMin minimum location value of output grid [m]
704 * @param xStep location step of output grid [m]
705 * @param xMax maximum location value of output grid [m]
706 * @param tMin minimum time value of output grid [s]
707 * @param tStep time step of output grid [s]
708 * @param tMax maximum time value of output grid [s]
709 * @param quantities quantities to calculate filtered data of
710 * @return filtered data, {@code null} when interrupted
711 */
712 @SuppressWarnings("methodlength")
713 public Filter filterFastSI(final double xMin, final double xStep, final double xMax, final double tMin, final double tStep,
714 final double tMax, final Quantity<?, ?>... quantities)
715 {
716 if (xMin > xMax || xStep <= 0.0 || tMin > tMax || tStep <= 0.0)
717 {
718 throw new IllegalArgumentException(
719 "Ill-defined grid. Make sure that xMax >= xMin, dx > 0, tMax >= tMin and dt > 0");
720 }
721 if (notifyListeners(0.0))
722 {
723 return null;
724 }
725
726 // initialize data
727 int n = 1 + (int) ((xMax - xMin) / xStep);
728 double[] location = new double[n];
729 IntStream.range(0, n).forEach(i -> location[i] = xMin + i * xStep);
730 n = 1 + (int) ((tMax - tMin) / tStep);
731 double[] time = new double[n];
732 IntStream.range(0, n).forEach(j -> time[j] = tMin + j * tStep);
733 Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
734 Map<Quantity<?, ?>, double[][]> weights = new LinkedHashMap<>();
735 for (Quantity<?, ?> quantity : quantities)
736 {
737 map.put(quantity, new double[location.length][time.length]);
738 weights.put(quantity, new double[location.length][time.length]);
739 }
740
741 // discretize Kernel
742 double xFrom = this.kernel.fromLocation(0.0);
743 xFrom = Double.isInfinite(xFrom) ? 2.0 * (xMin - xMax) : xFrom;
744 double xTo = this.kernel.toLocation(0.0);
745 xTo = Double.isInfinite(xTo) ? 2.0 * (xMax - xMin) : xTo;
746 double[] dx = equidistant(xFrom, xStep, xTo);
747 double tFrom = this.kernel.fromTime(0.0);
748 tFrom = Double.isInfinite(tFrom) ? 2.0 * (tMin - tMax) : tFrom;
749 double tTo = this.kernel.toTime(0.0);
750 tTo = Double.isInfinite(tTo) ? 2.0 * (tMax - tMin) : tTo;
751 double[] dt = equidistant(tFrom, tStep, tTo);
752 double[][] phiCong = new double[dx.length][dt.length];
753 double[][] phiFree = new double[dx.length][dt.length];
754 for (int i = 0; i < dx.length; i++)
755 {
756 for (int j = 0; j < dt.length; j++)
757 {
758 phiCong[i][j] = this.kernel.weight(this.cCong, dx[i], dt[j]);
759 phiFree[i][j] = this.kernel.weight(this.cFree, dx[i], dt[j]);
760 }
761 }
762
763 // discretize data
764 Map<DataStream<?>, double[][]> dataSum = new LinkedHashMap<>();
765 Map<DataStream<?>, double[][]> dataCount = new LinkedHashMap<>(); // integer counts, must be double[][] for convolution
766 // loop grid locations
767 for (int i = 0; i < location.length; i++)
768 {
769 // filter applicable data for location
770 Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
771 this.data.subMap(location[i] - 0.5 * xStep, true, location[i] + 0.5 * xStep, true);
772 // loop grid times
773 for (int j = 0; j < time.length; j++)
774 {
775 // filter and loop applicable data for time
776 for (NavigableMap<Double, Map<DataStream<?>, Double>> locationData : spatialData.values())
777 {
778 NavigableMap<Double, Map<DataStream<?>, Double>> temporalData =
779 locationData.subMap(time[j] - 0.5 * tStep, true, time[j] + 0.5 * tStep, true);
780 for (Map<DataStream<?>, Double> timeData : temporalData.values())
781 {
782 for (Map.Entry<DataStream<?>, Double> timeEntry : timeData.entrySet())
783 {
784 if (map.containsKey(timeEntry.getKey().getQuantity()) || timeEntry.getKey().getQuantity().isSpeed())
785 {
786 dataSum.computeIfAbsent(timeEntry.getKey(),
787 (key) -> new double[location.length][time.length])[i][j] += timeEntry.getValue();
788 dataCount.computeIfAbsent(timeEntry.getKey(),
789 (key) -> new double[location.length][time.length])[i][j]++;
790 }
791 }
792 }
793 }
794 }
795 }
796
797 // figure out the congestion level estimated for each data source
798 double steps = quantities.length + 1; // speed (for congestion level) and then all in quantities
799 double step = 0;
800 // store maps to prevent us from calculating the convolution for speed again later
801 Map<DataSource, double[][]> w = new LinkedHashMap<>();
802 Map<DataSource, double[][]> zCongSpeed = new LinkedHashMap<>();
803 Map<DataSource, double[][]> zFreeSpeed = new LinkedHashMap<>();
804 Map<DataSource, double[][]> nCongSpeed = new LinkedHashMap<>();
805 Map<DataSource, double[][]> nFreeSpeed = new LinkedHashMap<>();
806 for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
807 {
808 DataStream<?> dataStream = zEntry.getKey();
809 if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
810 {
811 // notify
812 double[][] vCong = Convolution.convolution(phiCong, zEntry.getValue());
813 if (notifyListeners((step + 0.25) / steps))
814 {
815 return null;
816 }
817 double[][] vFree = Convolution.convolution(phiFree, zEntry.getValue());
818 if (notifyListeners((step + 0.5) / steps))
819 {
820 return null;
821 }
822 double[][] count = dataCount.get(dataStream);
823 double[][] nCong = Convolution.convolution(phiCong, count);
824 if (notifyListeners((step + 0.75) / steps))
825 {
826 return null;
827 }
828 double[][] nFree = Convolution.convolution(phiFree, count);
829 double[][] wSource = new double[vCong.length][vCong[0].length];
830 for (int i = 0; i < vCong.length; i++)
831 {
832 for (int j = 0; j < vCong[0].length; j++)
833 {
834 double u = Math.min(vCong[i][j] / nCong[i][j], vFree[i][j] / nFree[i][j]);
835 wSource[i][j] = .5 * (1.0 + Math.tanh((Egtf.this.vc - u) / Egtf.this.deltaV));
836 }
837 }
838 w.put(dataStream.getDataSource(), wSource);
839 zCongSpeed.put(dataStream.getDataSource(), vCong);
840 zFreeSpeed.put(dataStream.getDataSource(), vFree);
841 nCongSpeed.put(dataStream.getDataSource(), nCong);
842 nFreeSpeed.put(dataStream.getDataSource(), nFree);
843 }
844 }
845 step++;
846 if (notifyListeners(step / steps))
847 {
848 return null;
849 }
850
851 // sum available data sources per quantity
852 double[][] wMean = null;
853 for (Quantity<?, ?> quantity : quantities)
854 {
855 // gather place for this quantity
856 double[][] qData = map.get(quantity);
857 double[][] qWeights = weights.get(quantity);
858 // loop streams that provide this quantity
859 Set<Map.Entry<DataStream<?>, double[][]>> zEntries = new LinkedHashSet<>();
860 for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
861 {
862 if (zEntry.getKey().getQuantity().equals(quantity))
863 {
864 zEntries.add(zEntry);
865 }
866 }
867 double streamCounter = 0;
868 for (Map.Entry<DataStream<?>, double[][]> zEntry : zEntries)
869 {
870 DataStream<?> dataStream = zEntry.getKey();
871
872 // obtain congestion level
873 double[][] wj;
874 if (!w.containsKey(dataStream.getDataSource()))
875 {
876 // this data source has no speed data, but congestion level can be estimated from other sources
877 if (wMean == null)
878 {
879 // let's see if speed was estimated already
880 for (Quantity<?, ?> prevQuant : quantities)
881 {
882 if (prevQuant.equals(quantity))
883 {
884 // it was not, get mean of other data source
885 wMean = new double[location.length][time.length];
886 for (double[][] ww : w.values())
887 {
888 for (int i = 0; i < location.length; i++)
889 {
890 for (int j = 0; j < time.length; j++)
891 {
892 wMean[i][j] += ww[i][j] / w.size();
893 }
894 }
895 }
896 break;
897 }
898 else if (prevQuant.isSpeed())
899 {
900 wMean = new double[location.length][time.length];
901 double[][] v = map.get(prevQuant);
902 for (int i = 0; i < location.length; i++)
903 {
904 for (int j = 0; j < time.length; j++)
905 {
906 wMean[i][j] = .5 * (1.0 + Math.tanh((Egtf.this.vc - v[i][j]) / Egtf.this.deltaV));
907 }
908 }
909 break;
910 }
911 }
912 }
913 wj = wMean;
914 }
915 else
916 {
917 wj = w.get(dataStream.getDataSource());
918 }
919
920 // convolutions of filters with discretized data and data counts
921 double[][] zCong;
922 double[][] zFree;
923 double[][] nCong;
924 double[][] nFree;
925 if (dataStream.getQuantity().isSpeed())
926 {
927 zCong = zCongSpeed.get(dataStream.getDataSource());
928 zFree = zFreeSpeed.get(dataStream.getDataSource());
929 nCong = nCongSpeed.get(dataStream.getDataSource());
930 nFree = nFreeSpeed.get(dataStream.getDataSource());
931 }
932 else
933 {
934 zCong = Convolution.convolution(phiCong, zEntry.getValue());
935 if (notifyListeners((step + (streamCounter + 0.25) / zEntries.size()) / steps))
936 {
937 return null;
938 }
939 zFree = Convolution.convolution(phiFree, zEntry.getValue());
940 if (notifyListeners((step + (streamCounter + 0.5) / zEntries.size()) / steps))
941 {
942 return null;
943 }
944 double[][] count = dataCount.get(dataStream);
945 nCong = Convolution.convolution(phiCong, count);
946 if (notifyListeners((step + (streamCounter + 0.75) / zEntries.size()) / steps))
947 {
948 return null;
949 }
950 nFree = Convolution.convolution(phiFree, count);
951 }
952
953 // loop grid to add to each weighted sum (weighted per data source)
954 for (int i = 0; i < location.length; i++)
955 {
956 for (int j = 0; j < time.length; j++)
957 {
958 double wCong = wj[i][j];
959 double wFree = 1.0 - wCong;
960 double value = wCong * zCong[i][j] / nCong[i][j] + wFree * zFree[i][j] / nFree[i][j];
961 // the fast filter supplies convoluted data counts, i.e. amount of data and filter proximity; this
962 // is exactly what the EGTF method needs to weigh data sources
963 double beta = wCong * nCong[i][j] + wFree * nFree[i][j];
964 double alpha = wCong / dataStream.getThetaCong() + wFree / dataStream.getThetaFree();
965 double weight = beta * alpha;
966 qData[i][j] += (value * weight);
967 qWeights[i][j] += weight;
968 }
969 }
970 streamCounter++;
971 if (notifyListeners((step + streamCounter / zEntries.size()) / steps))
972 {
973 return null;
974 }
975 }
976 for (int i = 0; i < location.length; i++)
977 {
978 for (int j = 0; j < time.length; j++)
979 {
980 qData[i][j] /= qWeights[i][j];
981 }
982 }
983 step++;
984 }
985
986 return new FilterDouble(location, time, map);
987 }
988
989 /**
990 * Returns an equidistant vector that includes 0.
991 * @param from lowest value to include
992 * @param step step
993 * @param to highest value to include
994 * @return equidistant vector that includes 0
995 */
996 private double[] equidistant(final double from, final double step, final double to)
997 {
998 int n1 = (int) (-from / step);
999 int n2 = (int) (to / step);
1000 int n = n1 + n2 + 1;
1001 double[] array = new double[n];
1002 for (int i = 0; i < n; i++)
1003 {
1004 array[i] = i < n1 ? step * (-n1 + i) : step * (i - n1);
1005 }
1006 return array;
1007 }
1008
1009 // *********************
1010 // *** EVENT METHODS ***
1011 // *********************
1012
1013 /**
1014 * Interrupt the calculation.
1015 */
1016 public final void interrupt()
1017 {
1018 this.interrupted = true;
1019 }
1020
1021 /**
1022 * Add listener.
1023 * @param listener listener
1024 */
1025 public final void addListener(final EgtfListener listener)
1026 {
1027 this.listeners.add(listener);
1028 }
1029
1030 /**
1031 * Remove listener.
1032 * @param listener listener
1033 */
1034 public final void removeListener(final EgtfListener listener)
1035 {
1036 this.listeners.remove(listener);
1037 }
1038
1039 /**
1040 * Notify all listeners.
1041 * @param progress progress, a value in the range [0 ... 1]
1042 * @return whether the filter is interrupted
1043 */
1044 private boolean notifyListeners(final double progress)
1045 {
1046 if (!this.listeners.isEmpty())
1047 {
1048 EgtfEvent event = new EgtfEvent(this, progress);
1049 for (EgtfListener listener : this.listeners)
1050 {
1051 listener.notifyProgress(event);
1052 }
1053 }
1054 return this.interrupted;
1055 }
1056
1057 // **********************
1058 // *** HELPER CLASSES ***
1059 // **********************
1060
1061 /**
1062 * Small class to build up a weighted mean under the congestion and free flow assumption.
1063 */
1064 private final class DualWeightedMean
1065 {
1066 /** Cumulative congestion numerator of weighted mean fraction, i.e. weighted sum. */
1067 private double numeratorCong;
1068
1069 /** Cumulative free flow numerator of weighted mean fraction, i.e. weighted sum. */
1070 private double numeratorFree;
1071
1072 /** Cumulative congestion denominator of weighted mean fraction, i.e. sum of weights. */
1073 private double denominatorCong;
1074
1075 /** Cumulative free flow denominator of weighted mean fraction, i.e. sum of weights. */
1076 private double denominatorFree;
1077
1078 /**
1079 * Adds a congestion value with weight.
1080 * @param value value
1081 * @param weight weight
1082 */
1083 public void addCong(final double value, final double weight)
1084 {
1085 this.numeratorCong += value * weight;
1086 this.denominatorCong += weight;
1087 }
1088
1089 /**
1090 * Adds a free flow value with weight.
1091 * @param value value
1092 * @param weight weight
1093 */
1094 public void addFree(final double value, final double weight)
1095 {
1096 this.numeratorFree += value * weight;
1097 this.denominatorFree += weight;
1098 }
1099
1100 /**
1101 * Returns the weighted congestion mean of available data.
1102 * @return weighted mean of available data
1103 */
1104 public double getCong()
1105 {
1106 return this.numeratorCong / this.denominatorCong;
1107 }
1108
1109 /**
1110 * Returns the weighted free flow mean of available data.
1111 * @return weighted free flow mean of available data
1112 */
1113 public double getFree()
1114 {
1115 return this.numeratorFree / this.denominatorFree;
1116 }
1117
1118 /**
1119 * Returns the sum of congestion weights.
1120 * @return the sum of congestion weights
1121 */
1122 public double getDenominatorCong()
1123 {
1124 return this.denominatorCong;
1125 }
1126
1127 /**
1128 * Returns the sum of free flow weights.
1129 * @return the sum of free flow weights
1130 */
1131 public double getDenominatorFree()
1132 {
1133 return this.denominatorFree;
1134 }
1135
1136 @Override
1137 public String toString()
1138 {
1139 return "DualWeightedMean [numeratorCong=" + this.numeratorCong + ", numeratorFree=" + this.numeratorFree
1140 + ", denominatorCong=" + this.denominatorCong + ", denominatorFree=" + this.denominatorFree + "]";
1141 }
1142
1143 }
1144
1145 /**
1146 * Small class to build up a weighted mean.
1147 */
1148 private final class WeightedMean
1149 {
1150 /** Cumulative numerator of weighted mean fraction, i.e. weighted sum. */
1151 private double numerator;
1152
1153 /** Cumulative denominator of weighted mean fraction, i.e. sum of weights. */
1154 private double denominator;
1155
1156 /**
1157 * Adds a value with weight.
1158 * @param value value
1159 * @param weight weight
1160 */
1161 public void add(final double value, final double weight)
1162 {
1163 this.numerator += value * weight;
1164 this.denominator += weight;
1165 }
1166
1167 /**
1168 * Returns the weighted mean of available data.
1169 * @return weighted mean of available data
1170 */
1171 public double get()
1172 {
1173 return this.numerator / this.denominator;
1174 }
1175
1176 @Override
1177 public String toString()
1178 {
1179 return "WeightedMean [numerator=" + this.numerator + ", denominator=" + this.denominator + "]";
1180 }
1181
1182 }
1183
1184 @Override
1185 public String toString()
1186 {
1187 return "EGTF [kernel=" + this.kernel + ", cCong=" + this.cCong + ", cFree=" + this.cFree + ", deltaV=" + this.deltaV
1188 + ", vc=" + this.vc + ", dataSources=" + this.dataSources + ", data=" + this.data + ", interrupted="
1189 + this.interrupted + ", listeners=" + this.listeners + "]";
1190 }
1191
1192 }