1 package org.opentrafficsim.core.egtf;
2
3 import java.util.LinkedHashMap;
4 import java.util.LinkedHashSet;
5 import java.util.Map;
6 import java.util.NavigableMap;
7 import java.util.Objects;
8 import java.util.Set;
9 import java.util.SortedMap;
10 import java.util.TreeMap;
11 import java.util.stream.IntStream;
12
13 /**
14 * Extended Generalized Treiber-Helbing Filter (van Lint and Hoogendoorn, 2009). This is an extension of the Adaptive Smoothing
15 * Method (Treiber and Helbing, 2002). A fast filter for equidistant grids (Schreiter et al., 2010) is available. This fast
16 * implementation also supports multiple data sources.
17 * <p>
18 * To allow flexible usage the EGTF works with {@code DataSource}, {@code DataStream} and {@code Quantity}.
19 * <p>
20 * A {@code DataSource}, such as "loop detectors", "floating-car data" or "camera" is mostly an identifier, but can be requested
21 * to provide several data streams.
22 * <p>
23 * A {@code DataStream} is one {@code DataSource} supplying one {@code Quantity}. For instance "loop detectors" supplying
24 * "flow". In a {@code DataStream}, supplied by the {@code DataSource}, standard deviation of measurements in congestion and
25 * free flow are defined. These determine the reliability of the {@code Quantity} data from the given {@code DataSource}, and
26 * thus ultimately the weight of the data in the estimation of the quantity.
27 * <p>
28 * A {@code Quantity}, such as "flow" or "density" defines what is measured and what is requested as output. The output can be
29 * in typed format using a {@code Converter}. Default quantities are available under {@code SPEED_SI}, {@code FLOW_SI} and
30 * {@code DENSITY_SI}, all under {@code Quantity}.
31 * <p>
32 * Data can be added using several methods for point data, vector data (multiple independent location-time combinations) and
33 * grid data (data in a grid pattern). Data is added for a particular {@code DataStream}.
34 * <p>
35 * For simple use-cases where a single data source is used, data can be added directly with a {@code Quantity}, in which case a
36 * default {@code DataSource}, and default {@code DataStream} for each {@code Quantity} is internally used. All data should
37 * either be added using {@code Quantity}'s, or it should all be added using {@code DataSource}'s. Otherwise relative data
38 * reliability is undefined.
39 * <p>
40 * Output can be requested from the EGTF using a {@code Kernel}, a spatiotemporal pattern determining measurement weights. The
41 * {@code Kernel} defines an optional maximum spatial and temporal range for measurements to consider, and uses a {@code Shape}
42 * to determine the weight for a given distance and time from the estimated point. By default this is an exponential function. A
43 * Gaussian kernel is also available, while any other shape could be also be implemented.
44 * <p>
45 * Parameters from the EGTF are found in the following places:
46 * <ul>
47 * <li>{@code EGTF}: <i>cCong</i>, <i>cFree</i>, <i>deltaV</i> and <i>vc</i>, defining the overall traffic flow properties.</li>
48 * <li>{@code Kernel}: <i>tMax</i> and <i>xMax</i>, defining the maximum range to consider.</li>
49 * <li>{@code KernelShape}: <i>sigma</i> and <i>tau</i>, determining the decay of weights for further measurements in space and
50 * time.</li>
51 * <li>{@code DataStream}: <i>thetaCong</i> and <i>thetaFree</i>, defining the reliability by the standard deviation of measured
52 * data in free flow and congestion from a particular data stream.</li>
53 * </ul>
54 * References:
55 * <ul>
56 * <li>van Lint, J. W. C. and Hoogendoorn, S. P. (2009). A robust and efficient method for fusing heterogeneous data from
57 * traffic sensors on freeways. Computer Aided Civil and Infrastructure Engineering, accepted for publication.</li>
58 * <li>Schreiter, T., van Lint, J. W. C., Treiber, M. and Hoogendoorn, S. P. (2010). Two fast implementations of the Adaptive
59 * Smoothing Method used in highway traffic state estimation. 13th International IEEE Conference on Intelligent Transportation
60 * Systems, 19-22 Sept. 2010, Funchal, Portugal.</li>
61 * <li>Treiber, M. and Helbing, D. (2002). Reconstructing the spatio-temporal traffic dynamics from stationary detector data.
62 * Cooper@tive Tr@nsport@tion Dyn@mics, 1:3.1–3.24.</li>
63 * </ul>
64 * <p>
65 * Copyright (c) 2013-2023 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
66 * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
67 * </p>
68 * @author <a href="https://dittlab.tudelft.nl">Wouter Schakel</a>
69 */
70 public class Egtf
71 {
72
73 /** Default sigma value. */
74 private static final double DEFAULT_SIGMA = 300.0;
75
76 /** Default tau value. */
77 private static final double DEFAULT_TAU = 30.0;
78
79 /** Filter kernel. */
80 private Kernel kernel;
81
82 /** Shock wave speed in congestion. */
83 private final double cCong;
84
85 /** Shock wave speed in free flow. */
86 private final double cFree;
87
88 /** Speed range between congestion and free flow. */
89 private final double deltaV;
90
91 /** Flip-over speed below which we have congestion. */
92 private final double vc;
93
94 /** Data sources by label so we can return the same instances upon repeated request. */
95 private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
96
97 /** Default data source for cases where a single data source is used. */
98 private DataSource defaultDataSource = null;
99
100 /** Default data streams for cases where a single data source is used. */
101 private Map<Quantity<?, ?>, DataStream<?>> defaultDataStreams = null;
102
103 /** True if data is currently being added using a quantity, in which case a check should not occur. */
104 private boolean addingByQuantity;
105
106 /** All point data sorted by space and time, and per data stream. */
107 private NavigableMap<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> data = new TreeMap<>();
108
109 /** Whether the calculation was interrupted. */
110 private boolean interrupted = false;
111
112 /** Listeners. */
113 private Set<EgtfListener> listeners = new LinkedHashSet<>();
114
115 /**
116 * Constructor using cCong = -18km/h, cFree = 80km/h, deltaV = 10km/h and vc = 80km/h. A default kernel is set.
117 */
118 public Egtf()
119 {
120 this(-18.0, 80.0, 10.0, 80.0);
121 }
122
123 /**
124 * Constructor defining global settings. A default kernel is set.
125 * @param cCong double; shock wave speed in congestion [km/h]
126 * @param cFree double; shock wave speed in free flow [km/h]
127 * @param deltaV double; speed range between congestion and free flow [km/h]
128 * @param vc double; flip-over speed below which we have congestion [km/h]
129 */
130 public Egtf(final double cCong, final double cFree, final double deltaV, final double vc)
131 {
132 this.cCong = cCong / 3.6;
133 this.cFree = cFree / 3.6;
134 this.deltaV = deltaV / 3.6;
135 this.vc = vc / 3.6;
136 setKernel();
137 }
138
139 /**
140 * Convenience constructor that also sets a specified kernel.
141 * @param cCong double; shock wave speed in congestion [km/h]
142 * @param cFree double; shock wave speed in free flow [km/h]
143 * @param deltaV double; speed range between congestion and free flow [km/h]
144 * @param vc double; flip-over speed below which we have congestion [km/h]
145 * @param sigma double; spatial kernel size in [m]
146 * @param tau double; temporal kernel size in [s]
147 * @param xMax double; maximum spatial range in [m]
148 * @param tMax double; maximum temporal range in [s]
149 */
150 @SuppressWarnings("parameternumber")
151 public Egtf(final double cCong, final double cFree, final double deltaV, final double vc, final double sigma,
152 final double tau, final double xMax, final double tMax)
153 {
154 this(cCong, cFree, deltaV, vc);
155 setKernelSI(sigma, tau, xMax, tMax);
156 }
157
158 // ********************
159 // *** DATA METHODS ***
160 // ********************
161
162 /**
163 * Return a data source, which is created if necessary.
164 * @param name String; unique name for the data source
165 * @return DataSource; data source
166 * @throws IllegalStateException when data has been added without a data source
167 */
168 public DataSource getDataSource(final String name)
169 {
170 if (this.defaultDataSource != null)
171 {
172 throw new IllegalStateException(
173 "Obtaining a (new) data source after data has been added without a data source is not allowed.");
174 }
175 return this.dataSources.computeIfAbsent(name, (
176 key
177 ) -> new DataSource(key));
178 }
179
180 /**
181 * Removes all data from before the given time. This is useful in live usages of this class, where older data is no longer
182 * required.
183 * @param time double; time before which all data can be removed
184 */
185 public synchronized void clearDataBefore(final double time)
186 {
187 for (SortedMap<Double, Map<DataStream<?>, Double>> map : this.data.values())
188 {
189 map.subMap(Double.NEGATIVE_INFINITY, time).clear();
190 }
191 }
192
193 /**
194 * Adds point data.
195 * @param quantity Quantity<?, ?>; quantity of the data
196 * @param location double; location in [m]
197 * @param time double; time in [s]
198 * @param value double; data value
199 * @throws IllegalStateException if data was added with a data stream previously
200 */
201 public synchronized void addPointDataSI(final Quantity<?, ?> quantity, final double location, final double time,
202 final double value)
203 {
204 this.addingByQuantity = true;
205 addPointDataSI(getDefaultDataStream(quantity), location, time, value);
206 this.addingByQuantity = false;
207 }
208
209 /**
210 * Adds point data.
211 * @param dataStream DataStream<?>; data stream of the data
212 * @param location double; location in [m]
213 * @param time double; time in [s]
214 * @param value double; data value
215 * @throws IllegalStateException if data was added with a quantity previously
216 */
217 public synchronized void addPointDataSI(final DataStream<?> dataStream, final double location, final double time,
218 final double value)
219 {
220 checkNoQuantityData();
221 Objects.requireNonNull(dataStream, "Datastream may not be null.");
222 if (!Double.isNaN(value))
223 {
224 getSpacioTemporalData(getSpatialData(location), time).put(dataStream, value);
225 }
226 }
227
228 /**
229 * Adds vector data.
230 * @param quantity Quantity<?, ?>; quantity of the data
231 * @param location double[]; locations in [m]
232 * @param time double[]; times in [s]
233 * @param values double[]; data values in SI unit
234 * @throws IllegalStateException if data was added with a data stream previously
235 */
236 public synchronized void addVectorDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
237 final double[] values)
238 {
239 this.addingByQuantity = true;
240 addVectorDataSI(getDefaultDataStream(quantity), location, time, values);
241 this.addingByQuantity = false;
242 }
243
244 /**
245 * Adds vector data.
246 * @param dataStream DataStream<?>; data stream of the data
247 * @param location double[]; locations in [m]
248 * @param time double[]; times in [s]
249 * @param values double[]; data values in SI unit
250 * @throws IllegalStateException if data was added with a quantity previously
251 */
252 public synchronized void addVectorDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
253 final double[] values)
254 {
255 checkNoQuantityData();
256 Objects.requireNonNull(dataStream, "Datastream may not be null.");
257 Objects.requireNonNull(location, "Location may not be null.");
258 Objects.requireNonNull(time, "Time may not be null.");
259 Objects.requireNonNull(values, "Values may not be null.");
260 if (location.length != time.length || time.length != values.length)
261 {
262 throw new IllegalArgumentException(String.format("Unequal lengths: location %d, time %d, data %d.", location.length,
263 time.length, values.length));
264 }
265 for (int i = 0; i < values.length; i++)
266 {
267 if (!Double.isNaN(values[i]))
268 {
269 getSpacioTemporalData(getSpatialData(location[i]), time[i]).put(dataStream, values[i]);
270 }
271 }
272 }
273
274 /**
275 * Adds grid data.
276 * @param quantity Quantity<?, ?>; quantity of the data
277 * @param location double[]; locations in [m]
278 * @param time double[]; times in [s]
279 * @param values double[][]; data values in SI unit
280 * @throws IllegalStateException if data was added with a data stream previously
281 */
282 public synchronized void addGridDataSI(final Quantity<?, ?> quantity, final double[] location, final double[] time,
283 final double[][] values)
284 {
285 this.addingByQuantity = true;
286 addGridDataSI(getDefaultDataStream(quantity), location, time, values);
287 this.addingByQuantity = false;
288 }
289
290 /**
291 * Adds grid data.
292 * @param dataStream DataStream<?>; data stream of the data
293 * @param location double[]; locations in [m]
294 * @param time double[]; times in [s]
295 * @param values double[][]; data values in SI unit
296 * @throws IllegalStateException if data was added with a quantity previously
297 */
298 public synchronized void addGridDataSI(final DataStream<?> dataStream, final double[] location, final double[] time,
299 final double[][] values)
300 {
301 checkNoQuantityData();
302 Objects.requireNonNull(dataStream, "Datastream may not be null.");
303 Objects.requireNonNull(location, "Location may not be null.");
304 Objects.requireNonNull(time, "Time may not be null.");
305 Objects.requireNonNull(values, "Values may not be null.");
306 if (values.length != location.length)
307 {
308 throw new IllegalArgumentException(
309 String.format("%d locations while length of data is %d", location.length, values.length));
310 }
311 for (int i = 0; i < location.length; i++)
312 {
313 if (values[i].length != time.length)
314 {
315 throw new IllegalArgumentException(
316 String.format("%d times while length of data is %d", time.length, values[i].length));
317 }
318 Map<Double, Map<DataStream<?>, Double>> spatialData = getSpatialData(location[i]);
319 for (int j = 0; j < time.length; j++)
320 {
321 if (!Double.isNaN(values[i][j]))
322 {
323 getSpacioTemporalData(spatialData, time[j]).put(dataStream, values[i][j]);
324 }
325 }
326 }
327 }
328
329 /**
330 * Check that no data was added using a quantity.
331 * @throws IllegalStateException if data was added with a quantity previously
332 */
333 private void checkNoQuantityData()
334 {
335 if (!this.addingByQuantity && this.defaultDataSource != null)
336 {
337 throw new IllegalStateException(
338 "Adding data with a data stream is not allowed after data has been added with a quantity.");
339 }
340 }
341
342 /**
343 * Returns a default data stream and checks that no data with a data stream was added.
344 * @param quantity Quantity<?, ?>; quantity
345 * @return DataStream<?>; default data stream
346 * @throws IllegalStateException if data was added with a data stream previously
347 */
348 private DataStream<?> getDefaultDataStream(final Quantity<?, ?> quantity)
349 {
350 Objects.requireNonNull(quantity, "Quantity may not be null.");
351 if (!this.dataSources.isEmpty())
352 {
353 throw new IllegalStateException(
354 "Adding data with a quantity is not allowed after data has been added with a data stream.");
355 }
356 if (this.defaultDataSource == null)
357 {
358 this.defaultDataSource = new DataSource("default");
359 this.defaultDataStreams = new LinkedHashMap<>();
360 }
361 return this.defaultDataStreams.computeIfAbsent(quantity, (
362 key
363 ) -> this.defaultDataSource.addStreamSI(quantity, 1.0, 1.0));
364 }
365
366 /**
367 * Returns data from a specific location as a subset from all data. An empty map is returned if no such data exists.
368 * @param location double; location in [m]
369 * @return data from a specific location
370 */
371 private SortedMap<Double, Map<DataStream<?>, Double>> getSpatialData(final double location)
372 {
373 return this.data.computeIfAbsent(location, (
374 key
375 ) -> new TreeMap<>());
376 }
377
378 /**
379 * Returns data from a specific time as a subset of data from a specific location. An empty map is returned if no such data
380 * exists.
381 * @param spatialData Map<Double, Map<DataStream<?>, Double>>; spatially selected data
382 * @param time double; time in [s]
383 * @return data from a specific time, from data from a specific location
384 */
385 private Map<DataStream<?>, Double> getSpacioTemporalData(final Map<Double, Map<DataStream<?>, Double>> spatialData,
386 final double time)
387 {
388 return spatialData.computeIfAbsent(time, (
389 key
390 ) -> new LinkedHashMap<>());
391 }
392
393 // **********************
394 // *** KERNEL METHODS ***
395 // **********************
396
397 /**
398 * Sets a default exponential kernel with infinite range, sigma = 300m, and tau = 30s.
399 */
400 public void setKernel()
401 {
402 setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, new ExpKernelShape(DEFAULT_SIGMA, DEFAULT_TAU));
403 }
404
405 /**
406 * Sets an exponential kernel with infinite range.
407 * @param sigma double; spatial kernel size
408 * @param tau double; temporal kernel size
409 */
410 public void setKernelSI(final double sigma, final double tau)
411 {
412 setKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
413 }
414
415 /**
416 * Sets an exponential kernel with limited range.
417 * @param sigma double; spatial kernel size in [m]
418 * @param tau double; temporal kernel size in [s]
419 * @param xMax double; maximum spatial range in [m]
420 * @param tMax double; maximum temporal range in [s]
421 */
422 public void setKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
423 {
424 setKernelSI(xMax, tMax, new ExpKernelShape(sigma, tau));
425 }
426
427 /**
428 * Sets a Gaussian kernel with infinite range.
429 * @param sigma double; spatial kernel size
430 * @param tau double; temporal kernel size
431 */
432 public void setGaussKernelSI(final double sigma, final double tau)
433 {
434 setGaussKernelSI(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, sigma, tau);
435 }
436
437 /**
438 * Sets a Gaussian kernel with limited range.
439 * @param sigma double; spatial kernel size in [m]
440 * @param tau double; temporal kernel size in [s]
441 * @param xMax double; maximum spatial range in [m]
442 * @param tMax double; maximum temporal range in [s]
443 */
444 public void setGaussKernelSI(final double sigma, final double tau, final double xMax, final double tMax)
445 {
446 setKernelSI(xMax, tMax, new GaussKernelShape(sigma, tau));
447 }
448
449 /**
450 * Sets a kernel with limited range and provided shape. The shape allows using non-exponential kernels.
451 * @param xMax double; maximum spatial range
452 * @param tMax double; maximum temporal range
453 * @param shape KernelShape; shape of the kernel
454 */
455 public synchronized void setKernelSI(final double xMax, final double tMax, final KernelShape shape)
456 {
457 this.kernel = new Kernel(xMax, tMax, shape);
458 }
459
460 /**
461 * Returns the wave speed in congestion.
462 * @return double; wave speed in congestion
463 */
464 final double getWaveSpeedCongestion()
465 {
466 return this.cCong;
467 }
468
469 /**
470 * Returns the wave speed in free flow.
471 * @return double; wave speed in free flow
472 */
473 final double getWaveSpeedFreeFlow()
474 {
475 return this.cFree;
476 }
477
478 // **********************
479 // *** FILTER METHODS ***
480 // **********************
481
482 /**
483 * Executes filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
484 * done. Finally, the filtering results can then be obtained from the listener.
485 * @param location double[]; location of output grid in [m]
486 * @param time double[]; time of output grid in [s]
487 * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
488 * @return EgtfParallelListener; listener to notify keep track of the progress
489 */
490 public EgtfParallelListener filterParallelSI(final double[] location, final double[] time,
491 final Quantity<?, ?>... quantities)
492 {
493 Objects.requireNonNull(location, "Location may not be null.");
494 Objects.requireNonNull(time, "Time may not be null.");
495 EgtfParallelListener listener = new EgtfParallelListener();
496 addListener(listener);
497 new Thread(new Runnable()
498 {
499 /** {@inheritDoc} */
500 @Override
501 public void run()
502 {
503 listener.setFilter(filterSI(location, time, quantities));
504 removeListener(listener);
505 }
506 }, "Egtf calculation thread").start();
507 return listener;
508 }
509
510 /**
511 * Executes fast filtering in parallel. The returned listener can be used to report progress and wait until the filtering is
512 * done. Finally, the filtering results can then be obtained from the listener.
513 * @param xMin double; minimum location value of output grid [m]
514 * @param xStep double; location step of output grid [m]
515 * @param xMax double; maximum location value of output grid [m]
516 * @param tMin double; minimum time value of output grid [s]
517 * @param tStep double; time step of output grid [s]
518 * @param tMax double; maximum time value of output grid [s]
519 * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
520 * @return EgtfParallelListener; listener to notify keep track of the progress
521 */
522 public EgtfParallelListener filterParallelFastSI(final double xMin, final double xStep, final double xMax,
523 final double tMin, final double tStep, final double tMax, final Quantity<?, ?>... quantities)
524 {
525 EgtfParallelListener listener = new EgtfParallelListener();
526 addListener(listener);
527 new Thread(new Runnable()
528 {
529 /** {@inheritDoc} */
530 @Override
531 public void run()
532 {
533 listener.setFilter(filterFastSI(xMin, xStep, xMax, tMin, tStep, tMax, quantities));
534 removeListener(listener);
535 }
536 }, "Egtf calculation thread").start();
537 return listener;
538 }
539
540 /**
541 * Returns filtered data. This is the standard EGTF implementation.
542 * @param location double[]; location of output grid in [m]
543 * @param time double[]; time of output grid in [s]
544 * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
545 * @return Filter; filtered data, {@code null} when interrupted
546 */
547 @SuppressWarnings({"synthetic-access", "methodlength"})
548 public Filter filterSI(final double[] location, final double[] time, final Quantity<?, ?>... quantities)
549 {
550 Objects.requireNonNull(location, "Location may not be null.");
551 Objects.requireNonNull(time, "Time may not be null.");
552
553 // initialize data
554 Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
555 for (Quantity<?, ?> quantity : quantities)
556 {
557 map.put(quantity, new double[location.length][time.length]);
558 }
559
560 // loop grid locations
561 for (int i = 0; i < location.length; i++)
562 {
563 double xGrid = location[i];
564
565 // filter applicable data for location
566 Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
567 this.data.subMap(this.kernel.fromLocation(xGrid), true, this.kernel.toLocation(xGrid), true);
568
569 // loop grid times
570 for (int j = 0; j < time.length; j++)
571 {
572 double tGrid = time[j];
573
574 // notify
575 if (notifyListeners((i + (double) j / time.length) / location.length))
576 {
577 return null;
578 }
579
580 // initialize data per stream
581 // quantity z assuming congestion and free flow
582 Map<DataStream<?>, DualWeightedMean> zCongFree = new LinkedHashMap<>();
583
584 // filter and loop applicable data for time
585 for (Map.Entry<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> xEntry : spatialData.entrySet())
586 {
587 double dx = xEntry.getKey() - xGrid;
588 Map<Double, Map<DataStream<?>, Double>> temporalData =
589 xEntry.getValue().subMap(this.kernel.fromTime(tGrid), true, this.kernel.toTime(tGrid), true);
590
591 for (Map.Entry<Double, Map<DataStream<?>, Double>> tEntry : temporalData.entrySet())
592 {
593 double dt = tEntry.getKey() - tGrid;
594 Map<DataStream<?>, Double> pData = tEntry.getValue();
595
596 double phiCong = this.kernel.weight(this.cCong, dx, dt);
597 double phiFree = this.kernel.weight(this.cFree, dx, dt);
598
599 // loop streams data at point
600 for (Map.Entry<DataStream<?>, Double> vEntry : pData.entrySet())
601 {
602 DataStream<?> stream = vEntry.getKey();
603 if (map.containsKey(stream.getQuantity()) || stream.getQuantity().isSpeed())
604 {
605 double v = vEntry.getValue();
606 DualWeightedMean zCongFreeOfStream = zCongFree.computeIfAbsent(stream, (
607 key
608 ) -> new DualWeightedMean());
609 zCongFreeOfStream.addCong(v, phiCong);
610 zCongFreeOfStream.addFree(v, phiFree);
611 }
612 }
613 }
614 }
615
616 // figure out the congestion level estimated for each data source
617 Map<DataSource, Double> w = new LinkedHashMap<>();
618 for (Map.Entry<DataStream<?>, DualWeightedMean> streamEntry : zCongFree.entrySet())
619 {
620 DataStream<?> dataStream = streamEntry.getKey();
621 if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
622 {
623 DualWeightedMean zCongFreeOfStream = streamEntry.getValue();
624 double u = Math.min(zCongFreeOfStream.getCong(), zCongFreeOfStream.getFree());
625 w.put(dataStream.getDataSource(), // 1 speed quantity per source allowed
626 .5 * (1.0 + Math.tanh((Egtf.this.vc - u) / Egtf.this.deltaV)));
627 continue;
628 }
629 }
630
631 // sum available data sources per quantity
632 Double wMean = null;
633 for (Map.Entry<Quantity<?, ?>, double[][]> qEntry : map.entrySet())
634 {
635 Quantity<?, ?> quantity = qEntry.getKey();
636 WeightedMean z = new WeightedMean();
637 for (Map.Entry<DataStream<?>, DualWeightedMean> zEntry : zCongFree.entrySet())
638 {
639 DataStream<?> dataStream = zEntry.getKey();
640 if (dataStream.getQuantity().equals(quantity))
641 {
642 // obtain congestion level
643 double wCong;
644 if (!w.containsKey(dataStream.getDataSource()))
645 {
646 // this data source has no speed data, but congestion level can be estimated from other sources
647 if (wMean == null)
648 {
649 // let's see if speed was estimated already
650 for (Quantity<?, ?> prevQuant : quantities)
651 {
652 if (prevQuant.equals(quantity))
653 {
654 // it was not, get mean of other data source
655 wMean = 0.0;
656 for (double ww : w.values())
657 {
658 wMean += ww / w.size();
659 }
660 break;
661 }
662 else if (prevQuant.isSpeed())
663 {
664 wMean = .5 * (1.0
665 + Math.tanh((Egtf.this.vc - map.get(prevQuant)[i][j]) / Egtf.this.deltaV));
666 break;
667 }
668 }
669 }
670 wCong = wMean;
671 }
672 else
673 {
674 wCong = w.get(dataStream.getDataSource());
675 }
676 // calculate estimated value z of this data source (no duplicate quantities per source allowed)
677 double wfree = 1.0 - wCong;
678 DualWeightedMean zCongFreej = zEntry.getValue();
679 double zStream = wCong * zCongFreej.getCong() + wfree * zCongFreej.getFree();
680 double weight;
681 if (w.size() > 1)
682 {
683 // data source more important if more and nearer measurements
684 double beta = wCong * zCongFreej.getDenominatorCong() + wfree * zCongFreej.getDenominatorFree();
685 // more important if more reliable (smaller standard deviation) at congestion level
686 double alpha = wCong / dataStream.getThetaCong() + wfree / dataStream.getThetaFree();
687 weight = alpha * beta;
688 }
689 else
690 {
691 weight = 1.0;
692 }
693 z.add(zStream, weight);
694 }
695 }
696 qEntry.getValue()[i][j] = z.get();
697 }
698 }
699 }
700 notifyListeners(1.0);
701
702 return new FilterDouble(location, time, map);
703 }
704
705 /**
706 * Returns filtered data that is processed using fast fourier transformation. This is much faster than the standard filter,
707 * at the cost that all input data is discretized to the output grid. The gain in computation speed is however such that
708 * finer output grids can be used to alleviate this. For discretization the output grid needs to be equidistant. It is
709 * recommended to set a Kernel with maximum bounds before using this method.
710 * <p>
711 * More than being a fast implementation of the Adaptive Smoothing Method, this implementation includes all data source like
712 * the Extended Generalized Treiber-Helbing Filter.
713 * @param xMin double; minimum location value of output grid [m]
714 * @param xStep double; location step of output grid [m]
715 * @param xMax double; maximum location value of output grid [m]
716 * @param tMin double; minimum time value of output grid [s]
717 * @param tStep double; time step of output grid [s]
718 * @param tMax double; maximum time value of output grid [s]
719 * @param quantities Quantity<?, ?>...; quantities to calculate filtered data of
720 * @return Filter; filtered data, {@code null} when interrupted
721 */
722 @SuppressWarnings("methodlength")
723 public Filter filterFastSI(final double xMin, final double xStep, final double xMax, final double tMin, final double tStep,
724 final double tMax, final Quantity<?, ?>... quantities)
725 {
726 if (xMin > xMax || xStep <= 0.0 || tMin > tMax || tStep <= 0.0)
727 {
728 throw new IllegalArgumentException(
729 "Ill-defined grid. Make sure that xMax >= xMin, dx > 0, tMax >= tMin and dt > 0");
730 }
731 if (notifyListeners(0.0))
732 {
733 return null;
734 }
735
736 // initialize data
737 int n = 1 + (int) ((xMax - xMin) / xStep);
738 double[] location = new double[n];
739 IntStream.range(0, n).forEach(i -> location[i] = xMin + i * xStep);
740 n = 1 + (int) ((tMax - tMin) / tStep);
741 double[] time = new double[n];
742 IntStream.range(0, n).forEach(j -> time[j] = tMin + j * tStep);
743 Map<Quantity<?, ?>, double[][]> map = new LinkedHashMap<>();
744 Map<Quantity<?, ?>, double[][]> weights = new LinkedHashMap<>();
745 for (Quantity<?, ?> quantity : quantities)
746 {
747 map.put(quantity, new double[location.length][time.length]);
748 weights.put(quantity, new double[location.length][time.length]);
749 }
750
751 // discretize Kernel
752 double xFrom = this.kernel.fromLocation(0.0);
753 xFrom = Double.isInfinite(xFrom) ? 2.0 * (xMin - xMax) : xFrom;
754 double xTo = this.kernel.toLocation(0.0);
755 xTo = Double.isInfinite(xTo) ? 2.0 * (xMax - xMin) : xTo;
756 double[] dx = equidistant(xFrom, xStep, xTo);
757 double tFrom = this.kernel.fromTime(0.0);
758 tFrom = Double.isInfinite(tFrom) ? 2.0 * (tMin - tMax) : tFrom;
759 double tTo = this.kernel.toTime(0.0);
760 tTo = Double.isInfinite(tTo) ? 2.0 * (tMax - tMin) : tTo;
761 double[] dt = equidistant(tFrom, tStep, tTo);
762 double[][] phiCong = new double[dx.length][dt.length];
763 double[][] phiFree = new double[dx.length][dt.length];
764 for (int i = 0; i < dx.length; i++)
765 {
766 for (int j = 0; j < dt.length; j++)
767 {
768 phiCong[i][j] = this.kernel.weight(this.cCong, dx[i], dt[j]);
769 phiFree[i][j] = this.kernel.weight(this.cFree, dx[i], dt[j]);
770 }
771 }
772
773 // discretize data
774 Map<DataStream<?>, double[][]> dataSum = new LinkedHashMap<>();
775 Map<DataStream<?>, double[][]> dataCount = new LinkedHashMap<>(); // integer counts, must be double[][] for convolution
776 // loop grid locations
777 for (int i = 0; i < location.length; i++)
778 {
779 // filter applicable data for location
780 Map<Double, NavigableMap<Double, Map<DataStream<?>, Double>>> spatialData =
781 this.data.subMap(location[i] - 0.5 * xStep, true, location[i] + 0.5 * xStep, true);
782 // loop grid times
783 for (int j = 0; j < time.length; j++)
784 {
785 // filter and loop applicable data for time
786 for (NavigableMap<Double, Map<DataStream<?>, Double>> locationData : spatialData.values())
787 {
788 NavigableMap<Double, Map<DataStream<?>, Double>> temporalData =
789 locationData.subMap(time[j] - 0.5 * tStep, true, time[j] + 0.5 * tStep, true);
790 for (Map<DataStream<?>, Double> timeData : temporalData.values())
791 {
792 for (Map.Entry<DataStream<?>, Double> timeEntry : timeData.entrySet())
793 {
794 if (map.containsKey(timeEntry.getKey().getQuantity()) || timeEntry.getKey().getQuantity().isSpeed())
795 {
796 dataSum.computeIfAbsent(timeEntry.getKey(), (
797 key
798 ) -> new double[location.length][time.length])[i][j] += timeEntry.getValue();
799 dataCount.computeIfAbsent(timeEntry.getKey(), (
800 key
801 ) -> new double[location.length][time.length])[i][j]++;
802 }
803 }
804 }
805 }
806 }
807 }
808
809 // figure out the congestion level estimated for each data source
810 double steps = quantities.length + 1; // speed (for congestion level) and then all in quantities
811 double step = 0;
812 // store maps to prevent us from calculating the convolution for speed again later
813 Map<DataSource, double[][]> w = new LinkedHashMap<>();
814 Map<DataSource, double[][]> zCongSpeed = new LinkedHashMap<>();
815 Map<DataSource, double[][]> zFreeSpeed = new LinkedHashMap<>();
816 Map<DataSource, double[][]> nCongSpeed = new LinkedHashMap<>();
817 Map<DataSource, double[][]> nFreeSpeed = new LinkedHashMap<>();
818 for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
819 {
820 DataStream<?> dataStream = zEntry.getKey();
821 if (dataStream.getQuantity().isSpeed()) // only one speed quantity allowed per data source
822 {
823 // notify
824 double[][] vCong = Convolution.convolution(phiCong, zEntry.getValue());
825 if (notifyListeners((step + 0.25) / steps))
826 {
827 return null;
828 }
829 double[][] vFree = Convolution.convolution(phiFree, zEntry.getValue());
830 if (notifyListeners((step + 0.5) / steps))
831 {
832 return null;
833 }
834 double[][] count = dataCount.get(dataStream);
835 double[][] nCong = Convolution.convolution(phiCong, count);
836 if (notifyListeners((step + 0.75) / steps))
837 {
838 return null;
839 }
840 double[][] nFree = Convolution.convolution(phiFree, count);
841 double[][] wSource = new double[vCong.length][vCong[0].length];
842 for (int i = 0; i < vCong.length; i++)
843 {
844 for (int j = 0; j < vCong[0].length; j++)
845 {
846 double u = Math.min(vCong[i][j] / nCong[i][j], vFree[i][j] / nFree[i][j]);
847 wSource[i][j] = .5 * (1.0 + Math.tanh((Egtf.this.vc - u) / Egtf.this.deltaV));
848 }
849 }
850 w.put(dataStream.getDataSource(), wSource);
851 zCongSpeed.put(dataStream.getDataSource(), vCong);
852 zFreeSpeed.put(dataStream.getDataSource(), vFree);
853 nCongSpeed.put(dataStream.getDataSource(), nCong);
854 nFreeSpeed.put(dataStream.getDataSource(), nFree);
855 }
856 }
857 step++;
858 if (notifyListeners(step / steps))
859 {
860 return null;
861 }
862
863 // sum available data sources per quantity
864 double[][] wMean = null;
865 for (Quantity<?, ?> quantity : quantities)
866 {
867 // gather place for this quantity
868 double[][] qData = map.get(quantity);
869 double[][] qWeights = weights.get(quantity);
870 // loop streams that provide this quantity
871 Set<Map.Entry<DataStream<?>, double[][]>> zEntries = new LinkedHashSet<>();
872 for (Map.Entry<DataStream<?>, double[][]> zEntry : dataSum.entrySet())
873 {
874 if (zEntry.getKey().getQuantity().equals(quantity))
875 {
876 zEntries.add(zEntry);
877 }
878 }
879 double streamCounter = 0;
880 for (Map.Entry<DataStream<?>, double[][]> zEntry : zEntries)
881 {
882 DataStream<?> dataStream = zEntry.getKey();
883
884 // obtain congestion level
885 double[][] wj;
886 if (!w.containsKey(dataStream.getDataSource()))
887 {
888 // this data source has no speed data, but congestion level can be estimated from other sources
889 if (wMean == null)
890 {
891 // let's see if speed was estimated already
892 for (Quantity<?, ?> prevQuant : quantities)
893 {
894 if (prevQuant.equals(quantity))
895 {
896 // it was not, get mean of other data source
897 wMean = new double[location.length][time.length];
898 for (double[][] ww : w.values())
899 {
900 for (int i = 0; i < location.length; i++)
901 {
902 for (int j = 0; j < time.length; j++)
903 {
904 wMean[i][j] += ww[i][j] / w.size();
905 }
906 }
907 }
908 break;
909 }
910 else if (prevQuant.isSpeed())
911 {
912 wMean = new double[location.length][time.length];
913 double[][] v = map.get(prevQuant);
914 for (int i = 0; i < location.length; i++)
915 {
916 for (int j = 0; j < time.length; j++)
917 {
918 wMean[i][j] = .5 * (1.0 + Math.tanh((Egtf.this.vc - v[i][j]) / Egtf.this.deltaV));
919 }
920 }
921 break;
922 }
923 }
924 }
925 wj = wMean;
926 }
927 else
928 {
929 wj = w.get(dataStream.getDataSource());
930 }
931
932 // convolutions of filters with discretized data and data counts
933 double[][] zCong;
934 double[][] zFree;
935 double[][] nCong;
936 double[][] nFree;
937 if (dataStream.getQuantity().isSpeed())
938 {
939 zCong = zCongSpeed.get(dataStream.getDataSource());
940 zFree = zFreeSpeed.get(dataStream.getDataSource());
941 nCong = nCongSpeed.get(dataStream.getDataSource());
942 nFree = nFreeSpeed.get(dataStream.getDataSource());
943 }
944 else
945 {
946 zCong = Convolution.convolution(phiCong, zEntry.getValue());
947 if (notifyListeners((step + (streamCounter + 0.25) / zEntries.size()) / steps))
948 {
949 return null;
950 }
951 zFree = Convolution.convolution(phiFree, zEntry.getValue());
952 if (notifyListeners((step + (streamCounter + 0.5) / zEntries.size()) / steps))
953 {
954 return null;
955 }
956 double[][] count = dataCount.get(dataStream);
957 nCong = Convolution.convolution(phiCong, count);
958 if (notifyListeners((step + (streamCounter + 0.75) / zEntries.size()) / steps))
959 {
960 return null;
961 }
962 nFree = Convolution.convolution(phiFree, count);
963 }
964
965 // loop grid to add to each weighted sum (weighted per data source)
966 for (int i = 0; i < location.length; i++)
967 {
968 for (int j = 0; j < time.length; j++)
969 {
970 double wCong = wj[i][j];
971 double wFree = 1.0 - wCong;
972 double value = wCong * zCong[i][j] / nCong[i][j] + wFree * zFree[i][j] / nFree[i][j];
973 // the fast filter supplies convoluted data counts, i.e. amount of data and filter proximity; this
974 // is exactly what the EGTF method needs to weigh data sources
975 double beta = wCong * nCong[i][j] + wFree * nFree[i][j];
976 double alpha = wCong / dataStream.getThetaCong() + wFree / dataStream.getThetaFree();
977 double weight = beta * alpha;
978 qData[i][j] += (value * weight);
979 qWeights[i][j] += weight;
980 }
981 }
982 streamCounter++;
983 if (notifyListeners((step + streamCounter / zEntries.size()) / steps))
984 {
985 return null;
986 }
987 }
988 for (int i = 0; i < location.length; i++)
989 {
990 for (int j = 0; j < time.length; j++)
991 {
992 qData[i][j] /= qWeights[i][j];
993 }
994 }
995 step++;
996 }
997
998 return new FilterDouble(location, time, map);
999 }
1000
1001 /**
1002 * Returns an equidistant vector that includes 0.
1003 * @param from double; lowest value to include
1004 * @param step double; step
1005 * @param to double; highest value to include
1006 * @return double[]; equidistant vector that includes 0
1007 */
1008 private double[] equidistant(final double from, final double step, final double to)
1009 {
1010 int n1 = (int) (-from / step);
1011 int n2 = (int) (to / step);
1012 int n = n1 + n2 + 1;
1013 double[] array = new double[n];
1014 for (int i = 0; i < n; i++)
1015 {
1016 array[i] = i < n1 ? step * (-n1 + i) : step * (i - n1);
1017 }
1018 return array;
1019 }
1020
1021 // *********************
1022 // *** EVENT METHODS ***
1023 // *********************
1024
1025 /**
1026 * Interrupt the calculation.
1027 */
1028 public final void interrupt()
1029 {
1030 this.interrupted = true;
1031 }
1032
1033 /**
1034 * Add listener.
1035 * @param listener EgtfListener; listener
1036 */
1037 public final void addListener(final EgtfListener listener)
1038 {
1039 this.listeners.add(listener);
1040 }
1041
1042 /**
1043 * Remove listener.
1044 * @param listener EgtfListener; listener
1045 */
1046 public final void removeListener(final EgtfListener listener)
1047 {
1048 this.listeners.remove(listener);
1049 }
1050
1051 /**
1052 * Notify all listeners.
1053 * @param progress double; progress, a value in the range [0 ... 1]
1054 * @return boolean; whether the filter is interrupted
1055 */
1056 private boolean notifyListeners(final double progress)
1057 {
1058 if (!this.listeners.isEmpty())
1059 {
1060 EgtfEvent event = new EgtfEvent(this, progress);
1061 for (EgtfListener listener : this.listeners)
1062 {
1063 listener.notifyProgress(event);
1064 }
1065 }
1066 return this.interrupted;
1067 }
1068
1069 // **********************
1070 // *** HELPER CLASSES ***
1071 // **********************
1072
1073 /**
1074 * Small class to build up a weighted mean under the congestion and free flow assumption.
1075 */
1076 private class DualWeightedMean
1077 {
1078 /** Cumulative congestion numerator of weighted mean fraction, i.e. weighted sum. */
1079 private double numeratorCong;
1080
1081 /** Cumulative free flow numerator of weighted mean fraction, i.e. weighted sum. */
1082 private double numeratorFree;
1083
1084 /** Cumulative congestion denominator of weighted mean fraction, i.e. sum of weights. */
1085 private double denominatorCong;
1086
1087 /** Cumulative free flow denominator of weighted mean fraction, i.e. sum of weights. */
1088 private double denominatorFree;
1089
1090 /**
1091 * Adds a congestion value with weight.
1092 * @param value double; value
1093 * @param weight double; weight
1094 */
1095 public void addCong(final double value, final double weight)
1096 {
1097 this.numeratorCong += value * weight;
1098 this.denominatorCong += weight;
1099 }
1100
1101 /**
1102 * Adds a free flow value with weight.
1103 * @param value double; value
1104 * @param weight double; weight
1105 */
1106 public void addFree(final double value, final double weight)
1107 {
1108 this.numeratorFree += value * weight;
1109 this.denominatorFree += weight;
1110 }
1111
1112 /**
1113 * Returns the weighted congestion mean of available data.
1114 * @return double; weighted mean of available data
1115 */
1116 public double getCong()
1117 {
1118 return this.numeratorCong / this.denominatorCong;
1119 }
1120
1121 /**
1122 * Returns the weighted free flow mean of available data.
1123 * @return double; weighted free flow mean of available data
1124 */
1125 public double getFree()
1126 {
1127 return this.numeratorFree / this.denominatorFree;
1128 }
1129
1130 /**
1131 * Returns the sum of congestion weights.
1132 * @return double; the sum of congestion weights
1133 */
1134 public double getDenominatorCong()
1135 {
1136 return this.denominatorCong;
1137 }
1138
1139 /**
1140 * Returns the sum of free flow weights.
1141 * @return double; the sum of free flow weights
1142 */
1143 public double getDenominatorFree()
1144 {
1145 return this.denominatorFree;
1146 }
1147
1148 /** {@inheritDoc} */
1149 @Override
1150 public String toString()
1151 {
1152 return "DualWeightedMean [numeratorCong=" + this.numeratorCong + ", numeratorFree=" + this.numeratorFree
1153 + ", denominatorCong=" + this.denominatorCong + ", denominatorFree=" + this.denominatorFree + "]";
1154 }
1155
1156 }
1157
1158 /**
1159 * Small class to build up a weighted mean.
1160 */
1161 private class WeightedMean
1162 {
1163 /** Cumulative numerator of weighted mean fraction, i.e. weighted sum. */
1164 private double numerator;
1165
1166 /** Cumulative denominator of weighted mean fraction, i.e. sum of weights. */
1167 private double denominator;
1168
1169 /**
1170 * Adds a value with weight.
1171 * @param value double; value
1172 * @param weight double; weight
1173 */
1174 public void add(final double value, final double weight)
1175 {
1176 this.numerator += value * weight;
1177 this.denominator += weight;
1178 }
1179
1180 /**
1181 * Returns the weighted mean of available data.
1182 * @return double; weighted mean of available data
1183 */
1184 public double get()
1185 {
1186 return this.numerator / this.denominator;
1187 }
1188
1189 /** {@inheritDoc} */
1190 @Override
1191 public String toString()
1192 {
1193 return "WeightedMean [numerator=" + this.numerator + ", denominator=" + this.denominator + "]";
1194 }
1195
1196 }
1197
1198 /** {@inheritDoc} */
1199 @Override
1200 public String toString()
1201 {
1202 return "EGTF [kernel=" + this.kernel + ", cCong=" + this.cCong + ", cFree=" + this.cFree + ", deltaV=" + this.deltaV
1203 + ", vc=" + this.vc + ", dataSources=" + this.dataSources + ", data=" + this.data + ", interrupted="
1204 + this.interrupted + ", listeners=" + this.listeners + "]";
1205 }
1206
1207 }