View Javadoc
1   package org.opentrafficsim.sim0mq.publisher;
2   
3   import java.rmi.RemoteException;
4   import java.util.ArrayList;
5   import java.util.EnumSet;
6   import java.util.LinkedHashMap;
7   import java.util.List;
8   import java.util.Map;
9   
10  import org.djutils.exceptions.Throw;
11  import org.djutils.event.Event;
12  import org.djutils.event.EventListener;
13  import org.djutils.event.EventProducer;
14  import org.djutils.event.EventType;
15  import org.djutils.event.TimedEvent;
16  import org.djutils.metadata.MetaData;
17  import org.djutils.metadata.ObjectDescriptor;
18  import org.djutils.serialization.SerializationException;
19  import org.sim0mq.Sim0MQException;
20  
21  /**
22   * Data collection that can be listed and has subscription to change events.
23   * <p>
24   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
25   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
26   * </p>
27   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
28   * @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
29   */
30  public class SubscriptionHandler
31  {
32      /** Id of this SubscriptionHandler. */
33      private final String id;
34  
35      /** Transceiver to retrieve the data right now; e.g. GtuIdTransceiver. */
36      private final TransceiverInterface listTransceiver;
37  
38      /** Event producer for add, remove, or change events; e.g. the Network. */
39      private final LookupEventProducer eventProducerForAddRemoveOrChange;
40  
41      /** EventType to subscribe to in order to receive creation of added object events; e.g. Network.GTU_ADD_EVENT. */
42      private final EventType addedEventType;
43  
44      /** EventType to subscribe to in order to receive removed object events; e.g. Network.GTU_REMOVE_EVENT. */
45      private final EventType removedEventType;
46  
47      /** EventType to subscript to in order to receive change of the collection, or object events. */
48      private final EventType changeEventType;
49  
50      /** SubscriptionHandler that handles subscriptions to individual objects; e.g. GTU.MOVE_EVENT. */
51      private final SubscriptionHandler elementSubscriptionHandler;
52  
53      /** The currently active subscriptions. */
54      private final Map<ReturnWrapper, Subscription> subscriptions = new LinkedHashMap<>();
55  
56      /**
57       * Create a new SubscriptionHandler.
58       * @param id String; id of the new SubscriptionHandler
59       * @param listTransceiver TransceiverInterface; transceiver to retrieve the data of <i>the addressed object</i> right now
60       * @param eventProducerForAddRemoveOrChange LookupEventProducer; the event producer that can emit the
61       *            <code>addedEventType</code>, <code>removedEventType</code>, or <code>changeEventType</code> events
62       * @param addedEventType EventType; event type that signals that a new element has been added, should be null if there is no
63       *            added event type for the data
64       * @param removedEventType EventType; event type that signals that an element has been removed, should be null if there is
65       *            no removed event type for the data
66       * @param changeEventType EventType; event type that signals that an element has been changed, should be null if there is no
67       *            change event type for the data
68       * @param elementSubscriptionHandler SubscriptionHandler; SubscriptionHandler for events produced by the underlying elements
69       */
70      public SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
71              final LookupEventProducer eventProducerForAddRemoveOrChange, final EventType addedEventType,
72              final EventType removedEventType, final EventType changeEventType,
73              final SubscriptionHandler elementSubscriptionHandler)
74      {
75          Throw.whenNull(id, "Id may not be null");
76          Throw.when(
77                  null == eventProducerForAddRemoveOrChange
78                          && (addedEventType != null || removedEventType != null || changeEventType != null),
79                  NullPointerException.class,
80                  "eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
81          this.id = id;
82          this.listTransceiver = listTransceiver;
83          this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
84          this.addedEventType = addedEventType;
85          this.removedEventType = removedEventType;
86          this.changeEventType = changeEventType;
87          this.elementSubscriptionHandler = elementSubscriptionHandler;
88      }
89  
90      /**
91       * Report what payload is required to retrieve a list of all elements, or data and what format a result would have.
92       * @return MetaData; description of the payload required to retrieve a list of all elements, or data and what format a
93       *         result would have
94       */
95      public MetaData listRequestMetaData()
96      {
97          return this.listTransceiver.getAddressFields();
98      }
99  
100     /**
101      * Report what the payload format of the result of the list transceiver.
102      * @return MetaData; the payload format of the result of the list transceiver
103      */
104     public MetaData listResultMetaData()
105     {
106         return this.listTransceiver.getResultFields();
107     }
108 
109     /**
110      * Retrieve a data collection.
111      * @param address Object[]; address of the requested data collection
112      * @param returnWrapper ReturnWrapper; to send back the result
113      * @throws RemoteException when communication fails
114      * @throws SerializationException on context error
115      * @throws Sim0MQException on DSOL error
116      */
117     public void get(final Object[] address, final ReturnWrapper returnWrapper)
118             throws RemoteException, Sim0MQException, SerializationException
119     {
120         sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
121     }
122 
123     /**
124      * Retrieve the list transceiver (only for testing).
125      * @return TransceiverInterface; the list transceiver
126      */
127     public TransceiverInterface getListTransceiver()
128     {
129         return this.listTransceiver;
130     }
131 
132     /**
133      * Return the set of supported commands.
134      * @return EnumSet&lt;Command&gt;; the set of supported commands.
135      */
136     public final EnumSet<Command> subscriptionOptions()
137     {
138         EnumSet<Command> result = EnumSet.noneOf(Command.class);
139         if (null != this.addedEventType)
140         {
141             result.add(Command.SUBSCRIBE_TO_ADD);
142             result.add(Command.UNSUBSCRIBE_FROM_ADD);
143         }
144         if (null != this.removedEventType)
145         {
146             result.add(Command.SUBSCRIBE_TO_REMOVE);
147             result.add(Command.UNSUBSCRIBE_FROM_REMOVE);
148         }
149         if (null != this.changeEventType)
150         {
151             result.add(Command.SUBSCRIBE_TO_CHANGE);
152             result.add(Command.UNSUBSCRIBE_FROM_CHANGE);
153         }
154         if (null != this.listTransceiver)
155         {
156             result.add(Command.GET_CURRENT);
157             result.add(Command.GET_ADDRESS_META_DATA);
158             result.add(Command.GET_RESULT_META_DATA);
159         }
160         return result;
161     }
162 
163     /**
164      * Create a new subscription to ADD, REMOVE, or CHANGE events.
165      * @param address Object[]; the data that is required to find the correct EventProducer
166      * @param eventType EventType; one of the event types that the addressed EventProducer can fire
167      * @param returnWrapper ReturnWrapper; generates envelopes for the returned events
168      * @throws RemoteException when communication fails
169      * @throws SerializationException should never happen
170      * @throws Sim0MQException should never happen
171      */
172     private void subscribeTo(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
173             throws RemoteException, Sim0MQException, SerializationException
174     {
175         if (null == eventType)
176         {
177             returnWrapper.nack("Does not support subscribe to");
178             return;
179         }
180         String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
181         if (bad != null)
182         {
183             returnWrapper.nack("Bad address: " + bad);
184             return;
185         }
186         EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
187         if (null == epi)
188         {
189             // Not necessarily bad; some EventProducers (e.g. GTUs) may disappear at any time
190             return; // NACK has been sent by this.eventProducerForAddRemoveOrChange.lookup
191         }
192         Subscription subscription = this.subscriptions.get(returnWrapper);
193         if (null == subscription)
194         {
195             subscription = new Subscription(returnWrapper);
196             this.subscriptions.put(returnWrapper, subscription);
197         }
198         if (epi.addListener(subscription, eventType))
199         {
200             returnWrapper.ack("Subscription created");
201         }
202         else
203         {
204             // There was already a subscription?
205             returnWrapper.ack("There was already such a subscription active");
206         }
207         // FIXME: if the subscription is an an Object that later disappears, the subscription map will still consume memory for
208         // that subscription. That could add up to a lot of memory ...
209     }
210 
211     /**
212      * Cancel a subscription to ADD, REMOVE, or CHANGE events.
213      * @param address Object[]; the data that is required to find the correct EventProducer
214      * @param eventType EventType; one of the event types that the addressed EventProducer can fire
215      * @param returnWrapper ReturnWrapper; the ReturnWapper that sent the results until now
216      * @throws RemoteException when communication fails
217      * @throws SerializationException should never happen
218      * @throws Sim0MQException should never happen
219      */
220     private void unsubscribeFrom(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
221             throws RemoteException, Sim0MQException, SerializationException
222     {
223         if (null == eventType)
224         {
225             returnWrapper.nack("Does not support unsubscribe from");
226             return;
227         }
228         String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
229         if (bad != null)
230         {
231             returnWrapper.nack("Bad address: " + bad);
232             return;
233         }
234         EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
235         if (null == epi)
236         {
237             returnWrapper.nack("Cound not find the event producer of the subscription; has it dissapeared?");
238             return;
239         }
240         Subscription subscription = this.subscriptions.get(returnWrapper);
241         if (null == subscription)
242         {
243             returnWrapper.nack("Cound not find a subscription to cancel");
244         }
245         else if (!epi.removeListener(subscription, eventType))
246         {
247             returnWrapper.nack("Subscription was not found");
248         }
249         else
250         {
251             this.subscriptions.remove(returnWrapper);
252             returnWrapper.ack("Subscription removed");
253         }
254     }
255 
256     /**
257      * Retrieve the id of this SubscriptionHandler.
258      * @return String; the id of this SubscriptionHandler
259      */
260     public final String getId()
261     {
262         return this.id;
263     }
264 
265     /**
266      * The commands that a SubscriptionHandler understands.
267      */
268     public enum Command
269     {
270         /** Subscribe to add events. */
271         SUBSCRIBE_TO_ADD,
272         /** Subscribe to remove events. */
273         SUBSCRIBE_TO_REMOVE,
274         /** Subscribe to change events. */
275         SUBSCRIBE_TO_CHANGE,
276         /** Unsubscribe to add events. */
277         UNSUBSCRIBE_FROM_ADD,
278         /** Unsubscribe to remove events. */
279         UNSUBSCRIBE_FROM_REMOVE,
280         /** Unsubscribe to change events. */
281         UNSUBSCRIBE_FROM_CHANGE,
282         /** Get current set (if a collection), c.q. state (if properties of one object). */
283         GET_CURRENT,
284         /** Get the address meta data. */
285         GET_ADDRESS_META_DATA,
286         /** Get the result meta data. */
287         GET_RESULT_META_DATA,
288         /** Get the output of the IdSource. */
289         GET_LIST,
290         /** Get the set of implemented commands (must - itself - always be implemented). */
291         GET_COMMANDS;
292     }
293 
294     /**
295      * Convert a String representing a Command into that Command.
296      * @param commandString String; the string
297      * @return Command; the corresponding Command, or null if the <code>commandString</code> is not a valid Command
298      */
299     public static Command lookupCommand(final String commandString)
300     {
301         if ("GET_ADDRESS_META_DATA".equals(commandString))
302         {
303             return Command.GET_ADDRESS_META_DATA;
304         }
305         else if ("GET_CURRENT".equals(commandString))
306         {
307             return Command.GET_CURRENT;
308         }
309         else if ("GET_RESULT_META_DATA".equals(commandString))
310         {
311             return Command.GET_RESULT_META_DATA;
312         }
313         else if ("GET_RESULT_META_DATA".equals(commandString))
314         {
315             return Command.GET_RESULT_META_DATA;
316         }
317         else if ("SUBSCRIBE_TO_ADD".equals(commandString))
318         {
319             return Command.SUBSCRIBE_TO_ADD;
320         }
321         else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
322         {
323             return Command.SUBSCRIBE_TO_CHANGE;
324         }
325         else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
326         {
327             return Command.SUBSCRIBE_TO_REMOVE;
328         }
329         else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
330         {
331             return Command.UNSUBSCRIBE_FROM_ADD;
332         }
333         else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
334         {
335             return Command.UNSUBSCRIBE_FROM_REMOVE;
336         }
337         else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
338         {
339             return Command.UNSUBSCRIBE_FROM_CHANGE;
340         }
341         else if ("GET_LIST".contentEquals(commandString))
342         {
343             return Command.GET_LIST;
344         }
345         else if ("GET_COMMANDS".contentEquals(commandString))
346         {
347             return Command.GET_COMMANDS;
348         }
349         System.err.println("Could not find command with name \"" + commandString + "\"");
350         return null;
351     }
352 
353     /**
354      * Execute one command.
355      * @param command Command; the command
356      * @param address Object[] the address of the object on which the command must be applied
357      * @param returnWrapper ReturnWrapper; envelope generator for replies
358      * @throws RemoteException on communication failure
359      * @throws SerializationException on illegal type in serialization
360      * @throws Sim0MQException on communication error
361      */
362     public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
363             throws RemoteException, Sim0MQException, SerializationException
364     {
365         Throw.whenNull(command, "Command may not be null");
366         Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
367         switch (command)
368         {
369             case SUBSCRIBE_TO_ADD:
370                 subscribeTo(address, this.addedEventType, returnWrapper);
371                 break;
372 
373             case SUBSCRIBE_TO_CHANGE:
374                 subscribeTo(address, this.changeEventType, returnWrapper);
375                 break;
376 
377             case SUBSCRIBE_TO_REMOVE:
378                 subscribeTo(address, this.removedEventType, returnWrapper);
379                 break;
380 
381             case UNSUBSCRIBE_FROM_ADD:
382                 unsubscribeFrom(address, this.addedEventType, returnWrapper);
383                 break;
384 
385             case UNSUBSCRIBE_FROM_CHANGE:
386                 unsubscribeFrom(address, this.changeEventType, returnWrapper);
387                 break;
388 
389             case UNSUBSCRIBE_FROM_REMOVE:
390                 unsubscribeFrom(address, this.removedEventType, returnWrapper);
391                 break;
392 
393             case GET_CURRENT:
394             {
395                 Object[] result = this.listTransceiver.get(address, returnWrapper);
396                 if (null != result)
397                 {
398                     sendResult(result, returnWrapper);
399                 }
400                 // TODO else?
401                 break;
402             }
403 
404             case GET_ADDRESS_META_DATA:
405                 if (null == this.listTransceiver)
406                 {
407                     returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
408                 }
409                 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
410                         returnWrapper);
411                 break;
412 
413             case GET_RESULT_META_DATA:
414                 if (null == this.listTransceiver)
415                 {
416                     returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
417                 }
418                 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
419                         returnWrapper);
420                 break;
421 
422             case GET_LIST:
423             {
424                 if (this.listTransceiver.hasIdSource())
425                 {
426                     sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
427                             returnWrapper);
428                 }
429                 else
430                 {
431                     sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
432                 }
433                 break;
434             }
435 
436             case GET_COMMANDS:
437                 List<String> resultList = new ArrayList<>();
438                 if (null != this.addedEventType)
439                 {
440                     resultList.add(Command.SUBSCRIBE_TO_ADD.toString());
441                     resultList.add(Command.UNSUBSCRIBE_FROM_ADD.toString());
442                 }
443                 if (null != this.removedEventType)
444                 {
445                     resultList.add(Command.SUBSCRIBE_TO_REMOVE.toString());
446                     resultList.add(Command.UNSUBSCRIBE_FROM_REMOVE.toString());
447 
448                 }
449                 if (null != this.changeEventType)
450                 {
451                     resultList.add(Command.SUBSCRIBE_TO_CHANGE.toString());
452                     resultList.add(Command.UNSUBSCRIBE_FROM_CHANGE.toString());
453                 }
454                 if (this.listTransceiver.getAddressFields() != null)
455                 {
456                     resultList.add(Command.GET_ADDRESS_META_DATA.toString());
457                 }
458                 if (this.listTransceiver.getResultFields() != null)
459                 {
460                     resultList.add(Command.GET_RESULT_META_DATA.toString());
461                 }
462                 if (null != this.listTransceiver)
463                 {
464                     resultList.add(Command.GET_LIST.toString());
465                 }
466                 resultList.add(Command.GET_COMMANDS.toString());
467                 Object[] result = new Object[resultList.size()];
468                 for (int index = 0; index < result.length; index++)
469                 {
470                     result[index] = resultList.get(index);
471                 }
472                 returnWrapper.encodeReplyAndTransmit(result);
473                 break;
474 
475             default:
476                 // Cannot happen
477                 break;
478         }
479     }
480 
481     /**
482      * Extract the class names from an array of ObjectDescriptor.
483      * @param objectDescriptors ObjectDescriptor[]; the array of ObjectDescriptor
484      * @return Object[]; the class names
485      */
486     private Object[] extractObjectDescriptorClassNames(final ObjectDescriptor[] objectDescriptors)
487     {
488         Object[] result = new Object[objectDescriptors.length];
489         for (int index = 0; index < objectDescriptors.length; index++)
490         {
491             result[index] = objectDescriptors[index].getObjectClass().getName();
492         }
493         return result;
494     }
495 
496     /**
497      * Send data via Sim0MQ to master if (and only if) it is non-null.
498      * @param data Object[]; the data to transmit
499      * @param returnWrapper ReturnWrapper; envelope constructor for returned results
500      * @throws SerializationException on illegal type in serialization
501      * @throws Sim0MQException on communication error
502      */
503     private void sendResult(final Object[] data, final ReturnWrapper returnWrapper)
504             throws Sim0MQException, SerializationException
505     {
506         if (data != null)
507         {
508             returnWrapper.encodeReplyAndTransmit(data);
509         }
510     }
511 
512     /** {@inheritDoc} */
513     @Override
514     public String toString()
515     {
516         return "SubscriptionHandler [id=" + this.id + ", listTransceiver=" + this.listTransceiver
517                 + ", eventProducerForAddRemoveOrChange=" + this.eventProducerForAddRemoveOrChange + ", addedEventType="
518                 + this.addedEventType + ", removedEventType=" + this.removedEventType + ", changeEventType="
519                 + this.changeEventType + ", elementSubscriptionHandler=" + this.elementSubscriptionHandler + "]";
520     }
521 
522 }
523 
524 /**
525  * Handles one subscription.
526  */
527 class Subscription implements EventListener
528 {
529     /** ... */
530     private static final long serialVersionUID = 20200428L;
531 
532     /** Generates envelopes for the messages sent over Sim0MQ. */
533     private final ReturnWrapper returnWrapper;
534 
535     /**
536      * Construct a new Subscription.
537      * @param returnWrapper ReturnWrapper; envelope generator for the messages
538      */
539     Subscription(final ReturnWrapper returnWrapper)
540     {
541         this.returnWrapper = returnWrapper;
542     }
543 
544     /** {@inheritDoc} */
545     @Override
546     public void notify(final Event event) throws RemoteException
547     {
548         MetaData metaData = event.getType().getMetaData();
549         int additionalFields = event.getType() instanceof EventType ? 1 : 0;
550         Object[] result = new Object[additionalFields + metaData.size()];
551         // result[0] = event.getType().getName();
552         if (additionalFields > 0)
553         {
554             result[0] = ((TimedEvent<?>) event).getTimeStamp();
555         }
556         Object payload = event.getContent();
557         if (payload instanceof Object[])
558         {
559             for (int index = 0; index < event.getType().getMetaData().size(); index++)
560             {
561                 result[additionalFields + index] = ((Object[]) payload)[index];
562             }
563         }
564         else
565         {
566             result[additionalFields] = payload;
567         }
568         // TODO verify the composition of the result. Problem: no access to the metadata here
569         try
570         {
571             this.returnWrapper.encodeReplyAndTransmit(result);
572         }
573         catch (Sim0MQException | SerializationException e)
574         {
575             e.printStackTrace();
576         }
577     }
578 
579 }