1 package org.opentrafficsim.core.egtf;
2
3 import java.util.LinkedHashMap;
4 import java.util.LinkedHashSet;
5 import java.util.Map;
6 import java.util.NavigableMap;
7 import java.util.Objects;
8 import java.util.Set;
9 import java.util.SortedMap;
10 import java.util.TreeMap;
11 import java.util.stream.IntStream;
12
13 /**
14 * Extended Generalized Treiber-Helbing Filter (van Lint and Hoogendoorn, 2009). This is an extension of the Adaptive Smoothing
15 * Method (Treiber and Helbing, 2002). A fast filter for equidistant grids (Schreiter et al., 2010) is available. This fast
16 * implementation also supports multiple data sources.
17 * <p>
18 * To allow flexible usage the EGTF works with {@code DataSource}, {@code DataStream} and {@code Quantity}.
19 * <p>
20 * A {@code DataSource}, such as "loop detectors", "floating-car data" or "camera" is mostly an identifier, but can be requested
21 * to provide several data streams.
22 * <p>
23 * A {@code DataStream} is one {@code DataSource} supplying one {@code Quantity}. For instance "loop detectors" supplying
24 * "flow". In a {@code DataStream}, supplied by the {@code DataSource}, standard deviation of measurements in congestion and
25 * free flow are defined. These determine the reliability of the {@code Quantity} data from the given {@code DataSource}, and
26 * thus ultimately the weight of the data in the estimation of the quantity.
27 * <p>
28 * A {@code Quantity}, such as "flow" or "density" defines what is measured and what is requested as output. The output can be
29 * in typed format using a {@code Converter}. Default quantities are available under {@code SPEED_SI}, {@code FLOW_SI} and
30 * {@code DENSITY_SI}, all under {@code Quantity}.
31 * <p>
32 * Data can be added using several methods for point data, vector data (multiple independent location-time combinations) and
33 * grid data (data in a grid pattern). Data is added for a particular {@code DataStream}.
34 * <p>
35 * For simple use-cases where a single data source is used, data can be added directly with a {@code Quantity}, in which case a
36 * default {@code DataSource}, and default {@code DataStream} for each {@code Quantity} is internally used. All data should
37 * either be added using {@code Quantity}'s, or it should all be added using {@code DataSource}'s. Otherwise relative data
38 * reliability is undefined.
39 * <p>
40 * Output can be requested from the EGTF using a {@code Kernel}, a spatiotemporal pattern determining measurement weights. The
41 * {@code Kernel} defines an optional maximum spatial and temporal range for measurements to consider, and uses a {@code Shape}
42 * to determine the weight for a given distance and time from the estimated point. By default this is an exponential function. A
43 * Gaussian kernel is also available, while any other shape could be also be implemented.
44 * <p>
45 * Parameters from the EGTF are found in the following places:
46 * <ul>
47 * <li>{@code EGTF}: <i>cCong</i>, <i>cFree</i>, <i>deltaV</i> and <i>vc</i>, defining the overall traffic flow properties.</li>
48 * <li>{@code Kernel}: <i>tMax</i> and <i>xMax</i>, defining the maximum range to consider.</li>
49 * <li>{@code KernelShape}: <i>sigma</i> and <i>tau</i>, determining the decay of weights for further measurements in space and
50 * time.</li>
51 * <li>{@code DataStream}: <i>thetaCong</i> and <i>thetaFree</i>, defining the reliability by the standard deviation of measured
52 * data in free flow and congestion from a particular data stream.</li>
53 * </ul>
54 * References:
55 * <ul>
56 * <li>van Lint, J. W. C. and Hoogendoorn, S. P. (2009). A robust and efficient method for fusing heterogeneous data from
57 * traffic sensors on freeways. Computer Aided Civil and Infrastructure Engineering, accepted for publication.</li>
58 * <li>Schreiter, T., van Lint, J. W. C., Treiber, M. and Hoogendoorn, S. P. (2010). Two fast implementations of the Adaptive
59 * Smoothing Method used in highway traffic state estimation. 13th International IEEE Conference on Intelligent Transportation
60 * Systems, 19-22 Sept. 2010, Funchal, Portugal.</li>
61 * <li>Treiber, M. and Helbing, D. (2002). Reconstructing the spatio-temporal traffic dynamics from stationary detector data.
62 * Cooper@tive Tr@nsport@tion Dyn@mics, 1:3.1–3.24.</li>
63 * </ul>
64 * <p>
65 * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
66 * BSD-style license. See <a href="http://opentrafficsim.org/node/13">OpenTrafficSim License</a>.
67 * <p>
68 * @version $Revision$, $LastChangedDate$, by $Author$, initial version 3 okt. 2018 <br>
69 * @author <a href="http://www.transport.citg.tudelft.nl">Wouter Schakel</a>
70 */
71 public class EGTF
72 {
73
74 /** Default sigma value. */
75 private static final double DEFAULT_SIGMA = 300.0;
76
77 /** Default tau value. */
78 private static final double DEFAULT_TAU = 30.0;
79
80 /** Filter kernel. */
81 private Kernel kernel;
82
83 /** Shock wave speed in congestion. */
84 private final double cCong;
85
86 /** Shock wave speed in free flow. */
87 private final double cFree;
88
89 /** Speed range between congestion and free flow. */
90 private final double deltaV;
91
92 /** Flip-over speed below which we have congestion. */
93 private final double vc;
94
95 /** Data sources by label so we can return the same instances upon repeated request. */
96 private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
97
98 /** Default data source for cases where a single data source is used. */
99 private DataSource defaultDataSource = null;
100
101 /** Default data streams for cases where a single data source is used. */
102 private Map<Quantity<?, ?>, DataStream<?>> defaultDataStreams = null;
103
104 /** True if data is currently being added using a quantity, in which case a check should not occur. */
105 private boolean addingByQuantity;
106
107 /** All point data sorted by space and time, and per data stream. */
108 private NavigableMap<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> data = new TreeMap<>();
109
110 /** Whether the calculation was interrupted. */
111 private boolean interrupted = false;
112
113 /** Listeners. */
114 private Set<EgtfListener> listeners = new LinkedHashSet<>();
115
116 /**
117 * Constructor using cCong = -18km/h, cFree = 80km/h, deltaV = 10km/h and vc = 80km/h. A default kernel is set.
118 */
119 public EGTF()
120 {
121 this(-18.0, 80.0, 10.0, 80.0);
122 }
123
124 /**
125 * Constructor defining global settings. A default kernel is set.
126 * @param cCong double; shock wave speed in congestion [km/h]
127 * @param cFree double; shock wave speed in free flow [km/h]
128 * @param deltaV double; speed range between congestion and free flow [km/h]
129 * @param vc double; flip-over speed below which we have congestion [km/h]
130 */
131 public EGTF(final double cCong, final double cFree, final double deltaV, final double vc)
132 {
133 this.cCong = cCong / 3.6;
134 this.cFree = cFree / 3.6;
135 this.deltaV = deltaV / 3.6;
136 this.vc = vc / 3.6;
137 setKernel();
138 }
139
140 /**
141 * Convenience constructor that also sets a specified kernel.
142 * @param cCong double; shock wave speed in congestion [km/h]
143 * @param cFree double; shock wave speed in free flow [km/h]
144 * @param deltaV double; speed range between congestion and free flow [km/h]
145 * @param vc double; flip-over speed below which we have congestion [km/h]
146 * @param sigma double; spatial kernel size in [m]
147 * @param tau double; temporal kernel size in [s]
148 * @param xMax double; maximum spatial range in [m]
149 * @param tMax double; maximum temporal range in [s]
150 */
151 @SuppressWarnings("parameternumber")
152 public EGTF(final double cCong, final double cFree, final double deltaV, final double vc, final double sigma,
153 final double tau, final double xMax, final double tMax)
154 {
155 this(cCong, cFree, deltaV, vc);
156 setKernelSI(sigma, tau, xMax, tMax);
157 }
158
159 // ********************
160 // *** DATA METHODS ***
161 // ********************
162
163 /**
164 * Return a data source, which is created if necessary.
165 * @param name String; unique name for the data source
166 * @return DataSource; data source
167 * @throws IllegalStateException when data has been added without a data source
168 */
169 public DataSource getDataSource(final String name)
170 {
171 if (this.defaultDataSource != null)
172 {
173 throw new IllegalStateException(
174 "Obtaining a (new) data source after data has been added without a data source is not allowed.");
175 }
176 return this.dataSources.computeIfAbsent(name, (key) -> new DataSource(key));
177 }
178
179 /**
180 * Removes all data from before the given time. This is useful in live usages of this class, where older data is no longer
181 * required.
182 * @param time double; time before which all data can be removed
183 */
184 public synchronized void clearDataBefore(final double time)
185 {
186 for (SortedMap<Double, Map<DataStream<?>, Double>> map : this.data.values())
187 {
188 map.subMap(Double.NEGATIVE_INFINITY, time).clear();
189 }
190 }
191
192 /**
193 * Adds point data.
194 * @param quantity Quantity<?, ?>; quantity of the data
195 * @param location double; location in [m]
196 * @param time double; time in [s]
197 * @param value double; data value
198 * @throws IllegalStateException if data was added with a data stream previously
199 */
200 public synchronized void addPointDataSI(final Quantity<?, ?> quantity, final double location, final double time,
201 final double value)
202 {
203 this.addingByQuantity = true;
204 addPointDataSI(getDefaultDataStream(quantity), location, time, value);
205 this.addingByQuantity = false;
206 }
207
208 /**
209 * Adds point data.
210 * @param dataStream DataStream<?>; data stream of the data
211 * @param location double; location in [m]
212 * @param time double; time in [s]
213 * @param value double; data value
214 * @throws IllegalStateException if data was added with a quantity previously
215 */
216 public synchronized void addPointDataSI(final DataStream<?> dataStream, final double location, final double time,
217 final double value)
218 {
219 checkNoQuantityData();
220 Objects.requireNonNull(dataStream, "Datastream may not be null.");
221 if (!Double.isNaN(value))
222 {
223 getSpacioTemporalData(getSpatialData(location), time).put(dataStream, value);
224 }
225 }
226
227 /**
228 * Adds vector data.
229 * @param quantity Quantity<?, ?>; quantity of the data
230 * @param location double[]; locations in [m]
231 * @param time double[]; times in [s]
232 * @param values double[]; data values in SI unit
233 * @throws IllegalStateException if data was added with a data stream previously
234 */
235 public synchronized void addVectorDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
236 final double[] values)
237 {
238 this.addingByQuantity = true;
239 addVectorDataSI(getDefaultDataStream(quantity), location, time, values);
240 this.addingByQuantity = false;
241 }
242
243 /**
244 * Adds vector data.
245 * @param dataStream DataStream<?>; data stream of the data
246 * @param location double[]; locations in [m]
247 * @param time double[]; times in [s]
248 * @param values double[]; data values in SI unit
249 * @throws IllegalStateException if data was added with a quantity previously
250 */
251 public synchronized void addVectorDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
252 final double[] values)
253 {
254 checkNoQuantityData();
255 Objects.requireNonNull(dataStream, "Datastream may not be null.");
256 Objects.requireNonNull(location, "Location may not be null.");
257 Objects.requireNonNull(time, "Time may not be null.");
258 Objects.requireNonNull(values, "Values may not be null.");
259 if (location.length != time.length || time.length != values.length)
260 {
261 throw new IllegalArgumentException(String.format("Unequal lengths: location %d, time %d, data %d.", location.length,
262 time.length, values.length));
263 }
264 for (int i = 0; i < values.length; i++)
265 {
266 if (!Double.isNaN(values[i]))
267 {
268 getSpacioTemporalData(getSpatialData(location[i]), time[i]).put(dataStream, values[i]);
269 }
270 }
271 }
272
273 /**
274 * Adds grid data.
275 * @param quantity Quantity<?, ?>; quantity of the data
276 * @param location double[]; locations in [m]
277 * @param time double[]; times in [s]
278 * @param values double[][]; data values in SI unit
279 * @throws IllegalStateException if data was added with a data stream previously
280 */
281 public synchronized void addGridDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
282 final double[][] values)
283 {
284 this.addingByQuantity = true;
285 addGridDataSI(getDefaultDataStream(quantity), location, time, values);
286 this.addingByQuantity = false;
287 }
288
289 /**
290 * Adds grid data.
291 * @param dataStream DataStream<?>; data stream of the data
292 * @param location double[]; locations in [m]
293 * @param time double[]; times in [s]
294 * @param values double[][]; data values in SI unit
295 * @throws IllegalStateException if data was added with a quantity previously
296 */
297 public synchronized void addGridDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
298 final double[][] values)
299 {
300 checkNoQuantityData();
301 Objects.requireNonNull(dataStream, "Datastream may not be null.");
302 Objects.requireNonNull(location, "Location may not be null.");
303 Objects.requireNonNull(time, "Time may not be null.");
304 Objects.requireNonNull(values, "Values may not be null.");
305 if (values.length != location.length)
306 {
307 throw new IllegalArgumentException(
308 String.format("%d locations while length of data is %d", location.length, values.length));
309 }
310 for (int i = 0; i < location.length; i++)
311 {
312 if (values[i].length != time.length)
313 {
314 throw new IllegalArgumentException(
315 String.format("%d times while length of data is %d", time.length, values[i].length));
316 }
317 Map<Double, Map<DataStream<?>, Double>> spatialData = getSpatialData(location[i]);
318 for (int j = 0; j < time.length; j++)
319 {
320 if (!Double.isNaN(values[i][j]))
321 {
322 getSpacioTemporalData(spatialData, time[j]).put(dataStream, values[i][j]);
323 }
324 }
325 }
326 }
327
328 /**
329 * Check that no data was added using a quantity.
330 * @throws IllegalStateException if data was added with a quantity previously
331 */
332 private void checkNoQuantityData()
333 {
334 if (!this.addingByQuantity && this.defaultDataSource != null)
335 {
336 throw new IllegalStateException(
337 "Adding data with a data stream is not allowed after data has been added with a quantity.");
338 }
339 }
340
341 /**
342 * Returns a default data stream and checks that no data with a data stream was added.
343 * @param quantity Quantity<?, ?>; quantity
344 * @return DataStream<?>; default data stream
345 * @throws IllegalStateException if data was added with a data stream previously
346 */
347 private DataStream<?> getDefaultDataStream(final Quantity<?, ?> quantity)
348 {
349 Objects.requireNonNull(quantity, "Quantity may not be null.");
350 if (!this.dataSources.isEmpty())
351 {
352 throw new IllegalStateException(
353 "Adding data with a quantity is not allowed after data has been added with a data stream.");
354 }
355 if (this.defaultDataSource == null)
356 {
357 this.defaultDataSource = new DataSource("default");
358 this.defaultDataStreams = new LinkedHashMap<>();
359 }
360 return this.defaultDataStreams.computeIfAbsent(quantity,
361 (key) -> this.defaultDataSource.addStreamSI(quantity, 1.0, 1.0));
362 }
363
364 /**
365 * Returns data from a specific location as a subset from all data. An empty map is returned if no such data exists.
366 * @param location double; location in [m]
367 * @return data from a specific location
368 */
369 private SortedMap<Double, Map<DataStream<?>, Double>> getSpatialData(final double location)
370 {
371 return this.data.computeIfAbsent(location, (key) -> new TreeMap<>());
372 }
373
374 /**
375 * Returns data from a specific time as a subset of data from a specific location. An empty map is returned if no such data
376 * exists.
377 * @param spatialData Map<Double, Map<DataStream<?>, Double>>; spatially selected data
378 * @param time double; time in [s]
379 * @return data from a specific time, from data from a specific location
380 */
381 private Map<DataStream<?>, Double> getSpacioTemporalData(final Map<Double, Map<DataStream<?>, Double>> spatialData,
382 final double time)
383 {
384 return spatialData.computeIfAbsent(time, (key) -> new LinkedHashMap<>());
385 }
386
387 // **********************
388 // *** KERNEL METHODS ***
389 // **********************
390
391 /**
392 * Sets a default exponential kernel with infinite range, sigma = 300m, and tau = 30s.
393 */
394 public void setKernel()
395 {
396 setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, new ExpKernelShape(DEFAULT_SIGMA, DEFAULT_TAU));
397 }
398
399 /**
400 * Sets an exponential kernel with infinite range.
401 * @param sigma double; spatial kernel size
402 * @param tau double; temporal kernel size
403 */
404 public void setKernelSI(final double sigma, final double tau)
405 {
406 setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
407 }
408
409 /**
410 * Sets an exponential kernel with limited range.
411 * @param sigma double; spatial kernel size in [m]
412 * @param tau double; temporal kernel size in [s]
413 * @param xMax double; maximum spatial range in [m]
414 * @param tMax double; maximum temporal range in [s]
415 */
416 public void setKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
417 {
418 setKernelSI(xMax, tMax, new ExpKernelShape(sigma, tau));
419 }
420
421 /**
422 * Sets a Gaussian kernel with infinite range.
423 * @param sigma double; spatial kernel size
424 * @param tau double; temporal kernel size
425 */
426 public void setGaussKernelSI(final double sigma, final double tau)
427 {
428 setGaussKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
429 }
430
431 /**
432 * Sets a Gaussian kernel with limited range.
433 * @param sigma double; spatial kernel size in [m]
434 * @param tau double; temporal kernel size in [s]
435 * @param xMax double; maximum spatial range in [m]
436 * @param tMax double; maximum temporal range in [s]
437 */
438 public void setGaussKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
439 {
440 setKernelSI(xMax, tMax, new GaussKernelShape(sigma, tau));
441 }
442
443 /**
444 * Sets a kernel with limited range and provided shape. The shape allows using non-exponential kernels.
445 * @param xMax double; maximum spatial range
446 * @param tMax double; maximum temporal range
447 * @param shape KernelShape; shape of the kernel
448 */
449 public synchronized void setKernelSI(final double xMax, final double tMax, final KernelShape shape)
450 {
451 this.kernel = new Kernel(xMax, tMax, shape);
452 }
453
454 /**
455 * Returns the wave speed in congestion.
456 * @return double; wave speed in congestion
457 */
458 final double getWaveSpeedCongestion()
459 {
460 return this.cCong;
461 }
462
463 /**
464 * Returns the wave speed in free flow.
465 * @return double; wave speed in free flow
466 */
467 final double getWaveSpeedFreeFlow()
468 {
469 return this.cFree;
470 }
471
472 // **********************
473 // *** FILTER METHODS ***
474 // **********************
475
476 /**
477 * Executes filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
478 * done. Finally, the filtering results can then be obtained from the listener.
479 * @param location double[]; location of output grid in [m]
480 * @param time double[]; time of output grid in [s]
481 * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
482 * @return EgtfParallelListener; listener to notify keep track of the progress
483 */
484 public EgtfParallelListener filterParallelSI(final double[] location, final double[] time,
485 final Quantity<?, ?>... quantities)
486 {
487 Objects.requireNonNull(location, "Location may not be null.");
488 Objects.requireNonNull(time, "Time may not be null.");
489 EgtfParallelListenertener.html#EgtfParallelListener">EgtfParallelListener listener = new EgtfParallelListener();
490 addListener(listener);
491 new Thread(new Runnable()
492 {
493 /** {@inheritDoc} */
494 @Override
495 public void run()
496 {
497 listener.setFilter(filterSI(location, time, quantities));
498 removeListener(listener);
499 }
500 }, "Egtf calculation thread").start();
501 return listener;
502 }
503
504 /**
505 * Executes fast filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
506 * done. Finally, the filtering results can then be obtained from the listener.
507 * @param xMin double; minimum location value of output grid [m]
508 * @param xStep double; location step of output grid [m]
509 * @param xMax double; maximum location value of output grid [m]
510 * @param tMin double; minimum time value of output grid [s]
511 * @param tStep double; time step of output grid [s]
512 * @param tMax double; maximum time value of output grid [s]
513 * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
514 * @return EgtfParallelListener; listener to notify keep track of the progress
515 */
516 public EgtfParallelListener filterParallelFastSI(final double xMin, final double xStep, final double xMax,
517 final double tMin, final double tStep, final double tMax, final Quantity<?, ?>... quantities)
518 {
519 EgtfParallelListenertener.html#EgtfParallelListener">EgtfParallelListener listener = new EgtfParallelListener();
520 addListener(listener);
521 new Thread(new Runnable()
522 {
523 /** {@inheritDoc} */
524 @Override
525 public void run()
526 {
527 listener.setFilter(filterFastSI(xMin, xStep, xMax, tMin, tStep, tMax, quantities));
528 removeListener(listener);
529 }
530 }, "Egtf calculation thread").start();
531 return listener;
532 }
533
534 /**
535 * Returns filtered data. This is the standard EGTF implementation.
536 * @param location double[]; location of output grid in [m]
537 * @param time double[]; time of output grid in [s]
538 * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
539 * @return Filter; filtered data, {@code null} when interrupted
540 */
541 @SuppressWarnings({"synthetic-access", "methodlength"})
542 public Filter filterSI(final double[] location, final double[] time, final Quantity<?, ?>... quantities)
543 {
544 Objects.requireNonNull(location, "Location may not be null.");
545 Objects.requireNonNull(time, "Time may not be null.");
546
547 // initialize data
548 Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
549 for (Quantity<?, ?> quantity : quantities)
550 {
551 map.put(quantity, new double[location.length][time.length]);
552 }
553
554 // loop grid locations
555 for (int i = 0; i < location.length; i++)
556 {
557 double xGrid = location[i];
558
559 // filter applicable data for location
560 Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
561 this.data.subMap(this.kernel.fromLocation(xGrid), true, this.kernel.toLocation(xGrid), true);
562
563 // loop grid times
564 for (int j = 0; j < time.length; j++)
565 {
566 double tGrid = time[j];
567
568 // notify
569 if (notifyListeners((i + (double) j / time.length) / location.length))
570 {
571 return null;
572 }
573
574 // initialize data per stream
575 // quantity z assuming congestion and free flow
576 Map<DataStream<?>, DualWeightedMean> zCongFree = new LinkedHashMap<>();
577
578 // filter and loop applicable data for time
579 for (Map.Entry<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> xEntry : spatialData.entrySet())
580 {
581 double dx = xEntry.getKey() - xGrid;
582 Map<Double, Map<DataStream<?>, Double>> temporalData =
583 xEntry.getValue().subMap(this.kernel.fromTime(tGrid), true, this.kernel.toTime(tGrid), true);
584
585 for (Map.Entry<Double, Map<DataStream<?>, Double>> tEntry : temporalData.entrySet())
586 {
587 double dt = tEntry.getKey() - tGrid;
588 Map<DataStream<?>, Double> pData = tEntry.getValue();
589
590 double phiCong = this.kernel.weight(this.cCong, dx, dt);
591 double phiFree = this.kernel.weight(this.cFree, dx, dt);
592
593 // loop streams data at point
594 for (Map.Entry<DataStream<?>, Double> vEntry : pData.entrySet())
595 {
596 DataStream<?> stream = vEntry.getKey();
597 if (map.containsKey(stream.getQuantity()) || stream.getQuantity().isSpeed())
598 {
599 double v = vEntry.getValue();
600 DualWeightedMean zCongFreeOfStream =
601 zCongFree.computeIfAbsent(stream, (key) -> new DualWeightedMean());
602 zCongFreeOfStream.addCong(v, phiCong);
603 zCongFreeOfStream.addFree(v, phiFree);
604 }
605 }
606 }
607 }
608
609 // figure out the congestion level estimated for each data source
610 Map<DataSource, Double> w = new LinkedHashMap<>();
611 for (Map.Entry<DataStream<?>, DualWeightedMean> streamEntry : zCongFree.entrySet())
612 {
613 DataStream<?> dataStream = streamEntry.getKey();
614 if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
615 {
616 DualWeightedMean zCongFreeOfStream = streamEntry.getValue();
617 double u = Math.min(zCongFreeOfStream.getCong(), zCongFreeOfStream.getFree());
618 w.put(dataStream.getDataSource(), // 1 speed quantity per source allowed
619 .5 * (1.0 + Math.tanh((EGTF.this.vc - u) / EGTF.this.deltaV)));
620 continue;
621 }
622 }
623
624 // sum available data sources per quantity
625 Double wMean = null;
626 for (Map.Entry<Quantity<?, ?>, double[][]> qEntry : map.entrySet())
627 {
628 Quantity<?, ?> quantity = qEntry.getKey();
629 WeightedMean z = new WeightedMean();
630 for (Map.Entry<DataStream<?>, DualWeightedMean> zEntry : zCongFree.entrySet())
631 {
632 DataStream<?> dataStream = zEntry.getKey();
633 if (dataStream.getQuantity().equals(quantity))
634 {
635 // obtain congestion level
636 double wCong;
637 if (!w.containsKey(dataStream.getDataSource()))
638 {
639 // this data source has no speed data, but congestion level can be estimated from other sources
640 if (wMean == null)
641 {
642 // let's see if speed was estimated already
643 for (Quantity<?, ?> prevQuant : quantities)
644 {
645 if (prevQuant.equals(quantity))
646 {
647 // it was not, get mean of other data source
648 wMean = 0.0;
649 for (double ww : w.values())
650 {
651 wMean += ww / w.size();
652 }
653 break;
654 }
655 else if (prevQuant.isSpeed())
656 {
657 wMean = .5 * (1.0
658 + Math.tanh((EGTF.this.vc - map.get(prevQuant)[i][j]) / EGTF.this.deltaV));
659 break;
660 }
661 }
662 }
663 wCong = wMean;
664 }
665 else
666 {
667 wCong = w.get(dataStream.getDataSource());
668 }
669 // calculate estimated value z of this data source (no duplicate quantities per source allowed)
670 double wfree = 1.0 - wCong;
671 DualWeightedMean zCongFreej = zEntry.getValue();
672 double zStream = wCong * zCongFreej.getCong() + wfree * zCongFreej.getFree();
673 double weight;
674 if (w.size() > 1)
675 {
676 // data source more important if more and nearer measurements
677 double beta = wCong * zCongFreej.getDenominatorCong() + wfree * zCongFreej.getDenominatorFree();
678 // more important if more reliable (smaller standard deviation) at congestion level
679 double alpha = wCong / dataStream.getThetaCong() + wfree / dataStream.getThetaFree();
680 weight = alpha * beta;
681 }
682 else
683 {
684 weight = 1.0;
685 }
686 z.add(zStream, weight);
687 }
688 }
689 qEntry.getValue()[i][j] = z.get();
690 }
691 }
692 }
693 notifyListeners(1.0);
694
695 return new FilterDouble(location, time, map);
696 }
697
698 /**
699 * Returns filtered data that is processed using fast fourier transformation. This is much faster than the standard filter,
700 * at the cost that all input data is discretized to the output grid. The gain in computation speed is however such that
701 * finer output grids can be used to alleviate this. For discretization the output grid needs to be equidistant. It is
702 * recommended to set a Kernel with maximum bounds before using this method.
703 * <p>
704 * More than being a fast implementation of the Adaptive Smoothing Method, this implementation includes all data source like
705 * the Extended Generalized Treiber-Helbing Filter.
706 * @param xMin double; minimum location value of output grid [m]
707 * @param xStep double; location step of output grid [m]
708 * @param xMax double; maximum location value of output grid [m]
709 * @param tMin double; minimum time value of output grid [s]
710 * @param tStep double; time step of output grid [s]
711 * @param tMax double; maximum time value of output grid [s]
712 * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
713 * @return Filter; filtered data, {@code null} when interrupted
714 */
715 @SuppressWarnings("methodlength")
716 public Filter filterFastSI(final double xMin, final double xStep, final double xMax, final double tMin, final double tStep,
717 final double tMax, final Quantity<?, ?>... quantities)
718 {
719 if (xMin > xMax || xStep <= 0.0 || tMin > tMax || tStep <= 0.0)
720 {
721 throw new IllegalArgumentException(
722 "Ill-defined grid. Make sure that xMax >= xMin, dx > 0, tMax >= tMin and dt > 0");
723 }
724 if (notifyListeners(0.0))
725 {
726 return null;
727 }
728
729 // initialize data
730 int n = 1 + (int) ((xMax - xMin) / xStep);
731 double[] location = new double[n];
732 IntStream.range(0, n).forEach(i -> location[i] = xMin + i * xStep);
733 n = 1 + (int) ((tMax - tMin) / tStep);
734 double[] time = new double[n];
735 IntStream.range(0, n).forEach(j -> time[j] = tMin + j * tStep);
736 Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
737 Map<Quantity<?, ?>, double[][]> weights = new LinkedHashMap<>();
738 for (Quantity<?, ?> quantity : quantities)
739 {
740 map.put(quantity, new double[location.length][time.length]);
741 weights.put(quantity, new double[location.length][time.length]);
742 }
743
744 // discretize Kernel
745 double xFrom = this.kernel.fromLocation(0.0);
746 xFrom = Double.isInfinite(xFrom) ? 2.0 * (xMin - xMax) : xFrom;
747 double xTo = this.kernel.toLocation(0.0);
748 xTo = Double.isInfinite(xTo) ? 2.0 * (xMax - xMin) : xTo;
749 double[] dx = equidistant(xFrom, xStep, xTo);
750 double tFrom = this.kernel.fromTime(0.0);
751 tFrom = Double.isInfinite(tFrom) ? 2.0 * (tMin - tMax) : tFrom;
752 double tTo = this.kernel.toTime(0.0);
753 tTo = Double.isInfinite(tTo) ? 2.0 * (tMax - tMin) : tTo;
754 double[] dt = equidistant(tFrom, tStep, tTo);
755 double[][] phiCong = new double[dx.length][dt.length];
756 double[][] phiFree = new double[dx.length][dt.length];
757 for (int i = 0; i < dx.length; i++)
758 {
759 for (int j = 0; j < dt.length; j++)
760 {
761 phiCong[i][j] = this.kernel.weight(this.cCong, dx[i], dt[j]);
762 phiFree[i][j] = this.kernel.weight(this.cFree, dx[i], dt[j]);
763 }
764 }
765
766 // discretize data
767 Map<DataStream<?>, double[][]> dataSum = new LinkedHashMap<>();
768 Map<DataStream<?>, double[][]> dataCount = new LinkedHashMap<>(); // integer counts, must be double[][] for convolution
769 // loop grid locations
770 for (int i = 0; i < location.length; i++)
771 {
772 // filter applicable data for location
773 Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
774 this.data.subMap(location[i] - 0.5 * xStep, true, location[i] + 0.5 * xStep, true);
775 // loop grid times
776 for (int j = 0; j < time.length; j++)
777 {
778 // filter and loop applicable data for time
779 for (NavigableMap<Double, Map<DataStream<?>, Double>> locationData : spatialData.values())
780 {
781 NavigableMap<Double, Map<DataStream<?>, Double>> temporalData =
782 locationData.subMap(time[j] - 0.5 * tStep, true, time[j] + 0.5 * tStep, true);
783 for (Map<DataStream<?>, Double> timeData : temporalData.values())
784 {
785 for (Map.Entry<DataStream<?>, Double> timeEntry : timeData.entrySet())
786 {
787 if (map.containsKey(timeEntry.getKey().getQuantity()) || timeEntry.getKey().getQuantity().isSpeed())
788 {
789 dataSum.computeIfAbsent(timeEntry.getKey(),
790 (key) -> new double[location.length][time.length])[i][j] += timeEntry.getValue();
791 dataCount.computeIfAbsent(timeEntry.getKey(),
792 (key) -> new double[location.length][time.length])[i][j]++;
793 }
794 }
795 }
796 }
797 }
798 }
799
800 // figure out the congestion level estimated for each data source
801 double steps = quantities.length + 1; // speed (for congestion level) and then all in quantities
802 double step = 0;
803 // store maps to prevent us from calculating the convolution for speed again later
804 Map<DataSource, double[][]> w = new LinkedHashMap<>();
805 Map<DataSource, double[][]> zCongSpeed = new LinkedHashMap<>();
806 Map<DataSource, double[][]> zFreeSpeed = new LinkedHashMap<>();
807 Map<DataSource, double[][]> nCongSpeed = new LinkedHashMap<>();
808 Map<DataSource, double[][]> nFreeSpeed = new LinkedHashMap<>();
809 for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
810 {
811 DataStream<?> dataStream = zEntry.getKey();
812 if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
813 {
814 // notify
815 double[][] vCong = Convolution.convolution(phiCong, zEntry.getValue());
816 if (notifyListeners((step + 0.25) / steps))
817 {
818 return null;
819 }
820 double[][] vFree = Convolution.convolution(phiFree, zEntry.getValue());
821 if (notifyListeners((step + 0.5) / steps))
822 {
823 return null;
824 }
825 double[][] count = dataCount.get(dataStream);
826 double[][] nCong = Convolution.convolution(phiCong, count);
827 if (notifyListeners((step + 0.75) / steps))
828 {
829 return null;
830 }
831 double[][] nFree = Convolution.convolution(phiFree, count);
832 double[][] wSource = new double[vCong.length][vCong[0].length];
833 for (int i = 0; i < vCong.length; i++)
834 {
835 for (int j = 0; j < vCong[0].length; j++)
836 {
837 double u = Math.min(vCong[i][j] / nCong[i][j], vFree[i][j] / nFree[i][j]);
838 wSource[i][j] = .5 * (1.0 + Math.tanh((EGTF.this.vc - u) / EGTF.this.deltaV));
839 }
840 }
841 w.put(dataStream.getDataSource(), wSource);
842 zCongSpeed.put(dataStream.getDataSource(), vCong);
843 zFreeSpeed.put(dataStream.getDataSource(), vFree);
844 nCongSpeed.put(dataStream.getDataSource(), nCong);
845 nFreeSpeed.put(dataStream.getDataSource(), nFree);
846 }
847 }
848 step++;
849 if (notifyListeners(step / steps))
850 {
851 return null;
852 }
853
854 // sum available data sources per quantity
855 double[][] wMean = null;
856 for (Quantity<?, ?> quantity : quantities)
857 {
858 // gather place for this quantity
859 double[][] qData = map.get(quantity);
860 double[][] qWeights = weights.get(quantity);
861 // loop streams that provide this quantity
862 Set<Map.Entry<DataStream<?>, double[][]>> zEntries = new LinkedHashSet<>();
863 for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
864 {
865 if (zEntry.getKey().getQuantity().equals(quantity))
866 {
867 zEntries.add(zEntry);
868 }
869 }
870 double streamCounter = 0;
871 for (Map.Entry<DataStream<?>, double[][]> zEntry : zEntries)
872 {
873 DataStream<?> dataStream = zEntry.getKey();
874
875 // obtain congestion level
876 double[][] wj;
877 if (!w.containsKey(dataStream.getDataSource()))
878 {
879 // this data source has no speed data, but congestion level can be estimated from other sources
880 if (wMean == null)
881 {
882 // let's see if speed was estimated already
883 for (Quantity<?, ?> prevQuant : quantities)
884 {
885 if (prevQuant.equals(quantity))
886 {
887 // it was not, get mean of other data source
888 wMean = new double[location.length][time.length];
889 for (double[][] ww : w.values())
890 {
891 for (int i = 0; i < location.length; i++)
892 {
893 for (int j = 0; j < time.length; j++)
894 {
895 wMean[i][j] += ww[i][j] / w.size();
896 }
897 }
898 }
899 break;
900 }
901 else if (prevQuant.isSpeed())
902 {
903 wMean = new double[location.length][time.length];
904 double[][] v = map.get(prevQuant);
905 for (int i = 0; i < location.length; i++)
906 {
907 for (int j = 0; j < time.length; j++)
908 {
909 wMean[i][j] = .5 * (1.0 + Math.tanh((EGTF.this.vc - v[i][j]) / EGTF.this.deltaV));
910 }
911 }
912 break;
913 }
914 }
915 }
916 wj = wMean;
917 }
918 else
919 {
920 wj = w.get(dataStream.getDataSource());
921 }
922
923 // convolutions of filters with discretized data and data counts
924 double[][] zCong;
925 double[][] zFree;
926 double[][] nCong;
927 double[][] nFree;
928 if (dataStream.getQuantity().isSpeed())
929 {
930 zCong = zCongSpeed.get(dataStream.getDataSource());
931 zFree = zFreeSpeed.get(dataStream.getDataSource());
932 nCong = nCongSpeed.get(dataStream.getDataSource());
933 nFree = nFreeSpeed.get(dataStream.getDataSource());
934 }
935 else
936 {
937 zCong = Convolution.convolution(phiCong, zEntry.getValue());
938 if (notifyListeners((step + (streamCounter + 0.25) / zEntries.size()) / steps))
939 {
940 return null;
941 }
942 zFree = Convolution.convolution(phiFree, zEntry.getValue());
943 if (notifyListeners((step + (streamCounter + 0.5) / zEntries.size()) / steps))
944 {
945 return null;
946 }
947 double[][] count = dataCount.get(dataStream);
948 nCong = Convolution.convolution(phiCong, count);
949 if (notifyListeners((step + (streamCounter + 0.75) / zEntries.size()) / steps))
950 {
951 return null;
952 }
953 nFree = Convolution.convolution(phiFree, count);
954 }
955
956 // loop grid to add to each weighted sum (weighted per data source)
957 for (int i = 0; i < location.length; i++)
958 {
959 for (int j = 0; j < time.length; j++)
960 {
961 double wCong = wj[i][j];
962 double wFree = 1.0 - wCong;
963 double value = wCong * zCong[i][j] / nCong[i][j] + wFree * zFree[i][j] / nFree[i][j];
964 // the fast filter supplies convoluted data counts, i.e. amount of data and filter proximity; this
965 // is exactly what the EGTF method needs to weigh data sources
966 double beta = wCong * nCong[i][j] + wFree * nFree[i][j];
967 double alpha = wCong / dataStream.getThetaCong() + wFree / dataStream.getThetaFree();
968 double weight = beta * alpha;
969 qData[i][j] += (value * weight);
970 qWeights[i][j] += weight;
971 }
972 }
973 streamCounter++;
974 if (notifyListeners((step + streamCounter / zEntries.size()) / steps))
975 {
976 return null;
977 }
978 }
979 for (int i = 0; i < location.length; i++)
980 {
981 for (int j = 0; j < time.length; j++)
982 {
983 qData[i][j] /= qWeights[i][j];
984 }
985 }
986 step++;
987 }
988
989 return new FilterDouble(location, time, map);
990 }
991
992 /**
993 * Returns an equidistant vector that includes 0.
994 * @param from double; lowest value to include
995 * @param step double; step
996 * @param to double; highest value to include
997 * @return double[]; equidistant vector that includes 0
998 */
999 private double[] equidistant(final double from, final double step, final double to)
1000 {
1001 int n1 = (int) (-from / step);
1002 int n2 = (int) (to / step);
1003 int n = n1 + n2 + 1;
1004 double[] array = new double[n];
1005 for (int i = 0; i < n; i++)
1006 {
1007 array[i] = i < n1 ? step * (-n1 + i) : step * (i - n1);
1008 }
1009 return array;
1010 }
1011
1012 // *********************
1013 // *** EVENT METHODS ***
1014 // *********************
1015
1016 /**
1017 * Interrupt the calculation.
1018 */
1019 public final void interrupt()
1020 {
1021 this.interrupted = true;
1022 }
1023
1024 /**
1025 * Add listener.
1026 * @param listener EgtfListener; listener
1027 */
1028 public final void addListener(final EgtfListener listener)
1029 {
1030 this.listeners.add(listener);
1031 }
1032
1033 /**
1034 * Remove listener.
1035 * @param listener EgtfListener; listener
1036 */
1037 public final void removeListener(final EgtfListener listener)
1038 {
1039 this.listeners.remove(listener);
1040 }
1041
1042 /**
1043 * Notify all listeners.
1044 * @param progress double; progress, a value in the range [0 ... 1]
1045 * @return boolean; whether the filter is interrupted
1046 */
1047 private boolean notifyListeners(final double progress)
1048 {
1049 if (!this.listeners.isEmpty())
1050 {
1051 EgtfEventgtfEvent.html#EgtfEvent">EgtfEvent event = new EgtfEvent(this, progress);
1052 for (EgtfListener listener : this.listeners)
1053 {
1054 listener.notifyProgress(event);
1055 }
1056 }
1057 return this.interrupted;
1058 }
1059
1060 // **********************
1061 // *** HELPER CLASSES ***
1062 // **********************
1063
1064 /**
1065 * Small class to build up a weighted mean under the congestion and free flow assumption.
1066 */
1067 private class DualWeightedMean
1068 {
1069 /** Cumulative congestion numerator of weighted mean fraction, i.e. weighted sum. */
1070 private double numeratorCong;
1071
1072 /** Cumulative free flow numerator of weighted mean fraction, i.e. weighted sum. */
1073 private double numeratorFree;
1074
1075 /** Cumulative congestion denominator of weighted mean fraction, i.e. sum of weights. */
1076 private double denominatorCong;
1077
1078 /** Cumulative free flow denominator of weighted mean fraction, i.e. sum of weights. */
1079 private double denominatorFree;
1080
1081 /**
1082 * Adds a congestion value with weight.
1083 * @param value double; value
1084 * @param weight double; weight
1085 */
1086 public void addCong(final double value, final double weight)
1087 {
1088 this.numeratorCong += value * weight;
1089 this.denominatorCong += weight;
1090 }
1091
1092 /**
1093 * Adds a free flow value with weight.
1094 * @param value double; value
1095 * @param weight double; weight
1096 */
1097 public void addFree(final double value, final double weight)
1098 {
1099 this.numeratorFree += value * weight;
1100 this.denominatorFree += weight;
1101 }
1102
1103 /**
1104 * Returns the weighted congestion mean of available data.
1105 * @return double; weighted mean of available data
1106 */
1107 public double getCong()
1108 {
1109 return this.numeratorCong / this.denominatorCong;
1110 }
1111
1112 /**
1113 * Returns the weighted free flow mean of available data.
1114 * @return double; weighted free flow mean of available data
1115 */
1116 public double getFree()
1117 {
1118 return this.numeratorFree / this.denominatorFree;
1119 }
1120
1121 /**
1122 * Returns the sum of congestion weights.
1123 * @return double; the sum of congestion weights
1124 */
1125 public double getDenominatorCong()
1126 {
1127 return this.denominatorCong;
1128 }
1129
1130 /**
1131 * Returns the sum of free flow weights.
1132 * @return double; the sum of free flow weights
1133 */
1134 public double getDenominatorFree()
1135 {
1136 return this.denominatorFree;
1137 }
1138
1139 /** {@inheritDoc} */
1140 @Override
1141 public String toString()
1142 {
1143 return "DualWeightedMean [numeratorCong=" + this.numeratorCong + ", numeratorFree=" + this.numeratorFree
1144 + ", denominatorCong=" + this.denominatorCong + ", denominatorFree=" + this.denominatorFree + "]";
1145 }
1146
1147 }
1148
1149 /**
1150 * Small class to build up a weighted mean.
1151 */
1152 private class WeightedMean
1153 {
1154 /** Cumulative numerator of weighted mean fraction, i.e. weighted sum. */
1155 private double numerator;
1156
1157 /** Cumulative denominator of weighted mean fraction, i.e. sum of weights. */
1158 private double denominator;
1159
1160 /**
1161 * Adds a value with weight.
1162 * @param value double; value
1163 * @param weight double; weight
1164 */
1165 public void add(final double value, final double weight)
1166 {
1167 this.numerator += value * weight;
1168 this.denominator += weight;
1169 }
1170
1171 /**
1172 * Returns the weighted mean of available data.
1173 * @return double; weighted mean of available data
1174 */
1175 public double get()
1176 {
1177 return this.numerator / this.denominator;
1178 }
1179
1180 /** {@inheritDoc} */
1181 @Override
1182 public String toString()
1183 {
1184 return "WeightedMean [numerator=" + this.numerator + ", denominator=" + this.denominator + "]";
1185 }
1186
1187 }
1188
1189 /** {@inheritDoc} */
1190 @Override
1191 public String toString()
1192 {
1193 return "EGTF [kernel=" + this.kernel + ", cCong=" + this.cCong + ", cFree=" + this.cFree + ", deltaV=" + this.deltaV
1194 + ", vc=" + this.vc + ", dataSources=" + this.dataSources + ", data=" + this.data + ", interrupted="
1195 + this.interrupted + ", listeners=" + this.listeners + "]";
1196 }
1197
1198 }