View Javadoc
1   package org.opentrafficsim.sim0mq.publisher;
2   
3   import java.rmi.RemoteException;
4   import java.util.ArrayList;
5   import java.util.EnumSet;
6   import java.util.LinkedHashMap;
7   import java.util.List;
8   import java.util.Map;
9   
10  import org.djunits.Throw;
11  import org.djutils.event.EventInterface;
12  import org.djutils.event.EventListenerInterface;
13  import org.djutils.event.EventProducerInterface;
14  import org.djutils.event.TimedEvent;
15  import org.djutils.event.TimedEventType;
16  import org.djutils.metadata.MetaData;
17  import org.djutils.metadata.ObjectDescriptor;
18  import org.djutils.serialization.SerializationException;
19  import org.sim0mq.Sim0MQException;
20  
21  /**
22   * Data collection that can be listed and has subscription to change events.
23   * <p>
24   * Copyright (c) 2020-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
25   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
26   * <p>
27   * $LastChangedDate: 2020-02-13 11:08:16 +0100 (Thu, 13 Feb 2020) $, @version $Revision: 6383 $, by $Author: pknoppers $,
28   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
29   * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
30   */
31  public class SubscriptionHandler
32  {
33      /** Id of this SubscriptionHandler. */
34      private final String id;
35  
36      /** Transceiver to retrieve the data right now; e.g. GTUIdTransceiver. */
37      private final TransceiverInterface listTransceiver;
38  
39      /** Event producer for add, remove, or change events; e.g. the OTSNetwork. */
40      private final LookupEventProducerInterface eventProducerForAddRemoveOrChange;
41  
42      /** TimedEventType to subscribe to in order to receive creation of added object events; e.g. NETWORK.GTU_ADD_EVENT. */
43      private final TimedEventType addedEventType;
44  
45      /** TimedEventType to subscribe to in order to receive removed object events; e.g. NETWORK.GTU_REMOVE_EVENT. */
46      private final TimedEventType removedEventType;
47  
48      /** TimedEventType to subscript to in order to receive change of the collection, or object events. */
49      private final TimedEventType changeEventType;
50  
51      /** SubscriptionHandler that handles subscriptions to individual objects; e.g. GTU.MOVE_EVENT. */
52      private final SubscriptionHandler elementSubscriptionHandler;
53  
54      /** The currently active subscriptions. */
55      private final Map<ReturnWrapper, Subscription> subscriptions = new LinkedHashMap<>();
56  
57      /**
58       * Create a new SubscriptionHandler.
59       * @param id String; id of the new SubscriptionHandler
60       * @param listTransceiver TransceiverInterface; transceiver to retrieve the data of <i>the addressed object</i> right now
61       * @param eventProducerForAddRemoveOrChange LookupEventProducerInterface; the event producer that can emit the
62       *            <code>addedEventType</code>, <code>removedEventType</code>, or <code>changeEventType</code> events
63       * @param addedEventType EventType; event type that signals that a new element has been added, should be null if there is no
64       *            added event type for the data
65       * @param removedEventType EventType; event type that signals that an element has been removed, should be null if there is
66       *            no removed event type for the data
67       * @param changeEventType EventType; event type that signals that an element has been changed, should be null if there is no
68       *            change event type for the data
69       * @param elementSubscriptionHandler SubscriptionHandler; SubscriptionHandler for events produced by the underlying elements
70       */
71      SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
72              final LookupEventProducerInterface eventProducerForAddRemoveOrChange, final TimedEventType addedEventType,
73              final TimedEventType removedEventType, final TimedEventType changeEventType,
74              final SubscriptionHandler elementSubscriptionHandler)
75      {
76          Throw.whenNull(id, "Id may not be null");
77          Throw.when(
78                  null == eventProducerForAddRemoveOrChange
79                          && (addedEventType != null || removedEventType != null || changeEventType != null),
80                  NullPointerException.class,
81                  "eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
82          this.id = id;
83          this.listTransceiver = listTransceiver;
84          this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
85          this.addedEventType = addedEventType;
86          this.removedEventType = removedEventType;
87          this.changeEventType = changeEventType;
88          this.elementSubscriptionHandler = elementSubscriptionHandler;
89      }
90  
91      /**
92       * Report what payload is required to retrieve a list of all elements, or data and what format a result would have.
93       * @return MetaData; description of the payload required to retrieve a list of all elements, or data and what format a
94       *         result would have
95       */
96      public MetaData listRequestMetaData()
97      {
98          return this.listTransceiver.getAddressFields();
99      }
100 
101     /**
102      * Report what the payload format of the result of the list transceiver.
103      * @return MetaData; the payload format of the result of the list transceiver
104      */
105     public MetaData listResultMetaData()
106     {
107         return this.listTransceiver.getResultFields();
108     }
109 
110     /**
111      * Retrieve a data collection.
112      * @param address Object[]; address of the requested data collection
113      * @param returnWrapper ReturnWrapper; to send back the result
114      * @throws RemoteException when communication fails
115      * @throws SerializationException on context error
116      * @throws Sim0MQException on DSOL error
117      */
118     public void get(final Object[] address, final ReturnWrapper returnWrapper)
119             throws RemoteException, Sim0MQException, SerializationException
120     {
121         sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
122     }
123 
124     /**
125      * Retrieve the list transceiver (only for testing).
126      * @return TransceiverInterface; the list transceiver
127      */
128     public TransceiverInterface getListTransceiver()
129     {
130         return this.listTransceiver;
131     }
132 
133     /**
134      * Return the set of supported commands.
135      * @return EnumSet&lt;Command&gt;; the set of supported commands.
136      */
137     public final EnumSet<Command> subscriptionOptions()
138     {
139         EnumSet<Command> result = EnumSet.noneOf(Command.class);
140         if (null != this.addedEventType)
141         {
142             result.add(Command.SUBSCRIBE_TO_ADD);
143             result.add(Command.UNSUBSCRIBE_FROM_ADD);
144         }
145         if (null != this.removedEventType)
146         {
147             result.add(Command.SUBSCRIBE_TO_REMOVE);
148             result.add(Command.UNSUBSCRIBE_FROM_REMOVE);
149         }
150         if (null != this.changeEventType)
151         {
152             result.add(Command.SUBSCRIBE_TO_CHANGE);
153             result.add(Command.UNSUBSCRIBE_FROM_CHANGE);
154         }
155         if (null != this.listTransceiver)
156         {
157             result.add(Command.GET_CURRENT);
158             result.add(Command.GET_ADDRESS_META_DATA);
159             result.add(Command.GET_RESULT_META_DATA);
160         }
161         return result;
162     }
163 
164     /**
165      * Create a new subscription to ADD, REMOVE, or CHANGE events.
166      * @param address Object[]; the data that is required to find the correct EventProducer
167      * @param eventType TimedEventType; one of the event types that the addressed EventProducer can fire
168      * @param returnWrapper ReturnWrapper; generates envelopes for the returned events
169      * @throws RemoteException when communication fails
170      * @throws SerializationException should never happen
171      * @throws Sim0MQException should never happen
172      */
173     private void subscribeTo(final Object[] address, final TimedEventType eventType, final ReturnWrapper returnWrapper)
174             throws RemoteException, Sim0MQException, SerializationException
175     {
176         if (null == eventType)
177         {
178             returnWrapper.nack("Does not support subscribe to");
179             return;
180         }
181         String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
182         if (bad != null)
183         {
184             returnWrapper.nack("Bad address: " + bad);
185             return;
186         }
187         EventProducerInterface epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
188         if (null == epi)
189         {
190             // Not necessarily bad; some EventProducers (e.g. GTUs) may disappear at any time
191             return; // NACK has been sent by this.eventProducerForAddRemoveOrChange.lookup
192         }
193         Subscription subscription = this.subscriptions.get(returnWrapper);
194         if (null == subscription)
195         {
196             subscription = new Subscription(returnWrapper);
197             this.subscriptions.put(returnWrapper, subscription);
198         }
199         if (epi.addListener(subscription, eventType))
200         {
201             returnWrapper.ack("Subscription created");
202         }
203         else
204         {
205             // There was already a subscription?
206             returnWrapper.ack("There was already such a subscription active");
207         }
208         // FIXME: if the subscription is an an Object that later disappears, the subscription map will still consume memory for
209         // that subscription. That could add up to a lot of memory ...
210     }
211 
212     /**
213      * Cancel a subscription to ADD, REMOVE, or CHANGE events.
214      * @param address Object[]; the data that is required to find the correct EventProducer
215      * @param eventType TimedEventType; one of the event types that the addressed EventProducer can fire
216      * @param returnWrapper ReturnWrapper; the ReturnWapper that sent the results until now
217      * @throws RemoteException when communication fails
218      * @throws SerializationException should never happen
219      * @throws Sim0MQException should never happen
220      */
221     private void unsubscribeFrom(final Object[] address, final TimedEventType eventType, final ReturnWrapper returnWrapper)
222             throws RemoteException, Sim0MQException, SerializationException
223     {
224         if (null == eventType)
225         {
226             returnWrapper.nack("Does not support unsubscribe from");
227             return;
228         }
229         String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
230         if (bad != null)
231         {
232             returnWrapper.nack("Bad address: " + bad);
233             return;
234         }
235         EventProducerInterface epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
236         if (null == epi)
237         {
238             returnWrapper.nack("Cound not find the event producer of the subscription; has it dissapeared?");
239             return;
240         }
241         Subscription subscription = this.subscriptions.get(returnWrapper);
242         if (null == subscription)
243         {
244             returnWrapper.nack("Cound not find a subscription to cancel");
245         }
246         else if (!epi.removeListener(subscription, eventType))
247         {
248             returnWrapper.nack("Subscription was not found");
249         }
250         else
251         {
252             this.subscriptions.remove(returnWrapper);
253             returnWrapper.ack("Subscription removed");
254         }
255     }
256 
257     /**
258      * Retrieve the id of this SubscriptionHandler.
259      * @return String; the id of this SubscriptionHandler
260      */
261     public final String getId()
262     {
263         return this.id;
264     }
265 
266     /**
267      * The commands that a SubscriptionHandler understands.
268      */
269     public enum Command
270     {
271         /** Subscribe to add events. */
272         SUBSCRIBE_TO_ADD,
273         /** Subscribe to remove events. */
274         SUBSCRIBE_TO_REMOVE,
275         /** Subscribe to change events. */
276         SUBSCRIBE_TO_CHANGE,
277         /** Unsubscribe to add events. */
278         UNSUBSCRIBE_FROM_ADD,
279         /** Unsubscribe to remove events. */
280         UNSUBSCRIBE_FROM_REMOVE,
281         /** Unsubscribe to change events. */
282         UNSUBSCRIBE_FROM_CHANGE,
283         /** Get current set (if a collection), c.q. state (if properties of one object). */
284         GET_CURRENT,
285         /** Get the address meta data. */
286         GET_ADDRESS_META_DATA,
287         /** Get the result meta data. */
288         GET_RESULT_META_DATA,
289         /** Get the output of the IdSource. */
290         GET_LIST,
291         /** Get the set of implemented commands (must - itself - always be implemented). */
292         GET_COMMANDS;
293     }
294 
295     /**
296      * Convert a String representing a Command into that Command.
297      * @param commandString String; the string
298      * @return Command; the corresponding Command, or null if the <code>commandString</code> is not a valid Command
299      */
300     public static Command lookupCommand(final String commandString)
301     {
302         if ("GET_ADDRESS_META_DATA".equals(commandString))
303         {
304             return Command.GET_ADDRESS_META_DATA;
305         }
306         else if ("GET_CURRENT".equals(commandString))
307         {
308             return Command.GET_CURRENT;
309         }
310         else if ("GET_RESULT_META_DATA".equals(commandString))
311         {
312             return Command.GET_RESULT_META_DATA;
313         }
314         else if ("GET_RESULT_META_DATA".equals(commandString))
315         {
316             return Command.GET_RESULT_META_DATA;
317         }
318         else if ("SUBSCRIBE_TO_ADD".equals(commandString))
319         {
320             return Command.SUBSCRIBE_TO_ADD;
321         }
322         else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
323         {
324             return Command.SUBSCRIBE_TO_CHANGE;
325         }
326         else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
327         {
328             return Command.SUBSCRIBE_TO_REMOVE;
329         }
330         else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
331         {
332             return Command.UNSUBSCRIBE_FROM_ADD;
333         }
334         else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
335         {
336             return Command.UNSUBSCRIBE_FROM_REMOVE;
337         }
338         else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
339         {
340             return Command.UNSUBSCRIBE_FROM_CHANGE;
341         }
342         else if ("GET_LIST".contentEquals(commandString))
343         {
344             return Command.GET_LIST;
345         }
346         else if ("GET_COMMANDS".contentEquals(commandString))
347         {
348             return Command.GET_COMMANDS;
349         }
350         System.err.println("Could not find command with name \"" + commandString + "\"");
351         return null;
352     }
353 
354     /**
355      * Execute one command.
356      * @param command Command; the command
357      * @param address Object[] the address of the object on which the command must be applied
358      * @param returnWrapper ReturnWrapper; envelope generator for replies
359      * @throws RemoteException on communication failure
360      * @throws SerializationException on illegal type in serialization
361      * @throws Sim0MQException on communication error
362      */
363     public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
364             throws RemoteException, Sim0MQException, SerializationException
365     {
366         Throw.whenNull(command, "Command may not be null");
367         Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
368         switch (command)
369         {
370             case SUBSCRIBE_TO_ADD:
371                 subscribeTo(address, this.addedEventType, returnWrapper);
372                 break;
373 
374             case SUBSCRIBE_TO_CHANGE:
375                 subscribeTo(address, this.changeEventType, returnWrapper);
376                 break;
377 
378             case SUBSCRIBE_TO_REMOVE:
379                 subscribeTo(address, this.removedEventType, returnWrapper);
380                 break;
381 
382             case UNSUBSCRIBE_FROM_ADD:
383                 unsubscribeFrom(address, this.addedEventType, returnWrapper);
384                 break;
385 
386             case UNSUBSCRIBE_FROM_CHANGE:
387                 unsubscribeFrom(address, this.changeEventType, returnWrapper);
388                 break;
389 
390             case UNSUBSCRIBE_FROM_REMOVE:
391                 unsubscribeFrom(address, this.removedEventType, returnWrapper);
392                 break;
393 
394             case GET_CURRENT:
395             {
396                 Object[] result = this.listTransceiver.get(address, returnWrapper);
397                 if (null != result)
398                 {
399                     sendResult(result, returnWrapper);
400                 }
401                 // TODO else?
402                 break;
403             }
404 
405             case GET_ADDRESS_META_DATA:
406                 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
407                         returnWrapper);
408                 break;
409 
410             case GET_RESULT_META_DATA:
411                 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
412                         returnWrapper);
413                 break;
414 
415             case GET_LIST:
416             {
417                 if (this.listTransceiver.hasIdSource())
418                 {
419                     sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
420                             returnWrapper);
421                 }
422                 else
423                 {
424                     sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
425                 }
426                 break;
427             }
428 
429             case GET_COMMANDS:
430                 List<String> resultList = new ArrayList<>();
431                 if (null != this.addedEventType)
432                 {
433                     resultList.add(Command.SUBSCRIBE_TO_ADD.toString());
434                     resultList.add(Command.UNSUBSCRIBE_FROM_ADD.toString());
435                 }
436                 if (null != this.removedEventType)
437                 {
438                     resultList.add(Command.SUBSCRIBE_TO_REMOVE.toString());
439                     resultList.add(Command.UNSUBSCRIBE_FROM_REMOVE.toString());
440 
441                 }
442                 if (null != this.changeEventType)
443                 {
444                     resultList.add(Command.SUBSCRIBE_TO_CHANGE.toString());
445                     resultList.add(Command.UNSUBSCRIBE_FROM_CHANGE.toString());
446                 }
447                 if (this.listTransceiver.getAddressFields() != null)
448                 {
449                     resultList.add(Command.GET_ADDRESS_META_DATA.toString());
450                 }
451                 if (this.listTransceiver.getResultFields() != null)
452                 {
453                     resultList.add(Command.GET_RESULT_META_DATA.toString());
454                 }
455                 if (null != this.listTransceiver)
456                 {
457                     resultList.add(Command.GET_LIST.toString());
458                 }
459                 resultList.add(Command.GET_COMMANDS.toString());
460                 Object[] result = new Object[resultList.size()];
461                 for (int index = 0; index < result.length; index++)
462                 {
463                     result[index] = resultList.get(index);
464                 }
465                 returnWrapper.encodeReplyAndTransmit(result);
466                 break;
467 
468             default:
469                 // Cannot happen
470                 break;
471         }
472     }
473 
474     /**
475      * Extract the class names from an array of ObjectDescriptor.
476      * @param objectDescriptors ObjectDescriptor[]; the array of ObjectDescriptor
477      * @return Object[]; the class names
478      */
479     private Object[] extractObjectDescriptorClassNames(final ObjectDescriptor[] objectDescriptors)
480     {
481         Object[] result = new Object[objectDescriptors.length];
482         for (int index = 0; index < objectDescriptors.length; index++)
483         {
484             result[index] = objectDescriptors[index].getObjectClass().getName();
485         }
486         return result;
487     }
488 
489     /**
490      * Send data via Sim0MQ to master if (and only if) it is non-null.
491      * @param data Object[]; the data to transmit
492      * @param returnWrapper ReturnWrapper; envelope constructor for returned results
493      * @throws SerializationException on illegal type in serialization
494      * @throws Sim0MQException on communication error
495      */
496     private void sendResult(final Object[] data, final ReturnWrapper returnWrapper)
497             throws Sim0MQException, SerializationException
498     {
499         if (data != null)
500         {
501             returnWrapper.encodeReplyAndTransmit(data);
502         }
503     }
504 
505     /** {@inheritDoc} */
506     @Override
507     public String toString()
508     {
509         return "SubscriptionHandler [id=" + this.id + ", listTransceiver=" + this.listTransceiver
510                 + ", eventProducerForAddRemoveOrChange=" + this.eventProducerForAddRemoveOrChange + ", addedEventType="
511                 + this.addedEventType + ", removedEventType=" + this.removedEventType + ", changeEventType="
512                 + this.changeEventType + ", elementSubscriptionHandler=" + this.elementSubscriptionHandler + "]";
513     }
514 
515 }
516 
517 /**
518  * Object that can find the EventProducerInterface object for an address.
519  */
520 interface LookupEventProducerInterface
521 {
522     /**
523      * Find the EventProducerInterface with the given address.
524      * @param address Object[]; the address
525      * @param returnWrapper ReturnWrapper; to be used to send back complaints about bad addresses, etc.
526      * @return EventProducerInterface; can be null in case the address is (no longer) valid
527      * @throws SerializationException when an error occurs while serializing an error response
528      * @throws Sim0MQException when an error occurs while serializing an error response
529      */
530     EventProducerInterface lookup(Object[] address, ReturnWrapper returnWrapper) throws Sim0MQException, SerializationException;
531 
532     /**
533      * Return a MetaData object that can be used to verify the correctness of an address for the <code>lookup</code> method.
534      * @return MetaData; to be used to verify the correctness of an address for the <code>lookup</code> method
535      */
536     MetaData getAddressMetaData();
537 
538 }
539 
540 /**
541  * Handles one subscription.
542  */
543 class Subscription implements EventListenerInterface
544 {
545     /** ... */
546     private static final long serialVersionUID = 20200428L;
547 
548     /** Generates envelopes for the messages sent over Sim0MQ. */
549     private final ReturnWrapper returnWrapper;
550 
551     /**
552      * Construct a new Subscription.
553      * @param returnWrapper ReturnWrapper; envelope generator for the messages
554      */
555     Subscription(final ReturnWrapper returnWrapper)
556     {
557         this.returnWrapper = returnWrapper;
558     }
559 
560     /** {@inheritDoc} */
561     @Override
562     public void notify(final EventInterface event) throws RemoteException
563     {
564         MetaData metaData = event.getType().getMetaData();
565         int additionalFields = event.getType() instanceof TimedEventType ? 1 : 0;
566         Object[] result = new Object[additionalFields + metaData.size()];
567         // result[0] = event.getType().getName();
568         if (additionalFields > 0)
569         {
570             result[0] = ((TimedEvent<?>) event).getTimeStamp();
571         }
572         Object payload = event.getContent();
573         if (payload instanceof Object[])
574         {
575             for (int index = 0; index < event.getType().getMetaData().size(); index++)
576             {
577                 result[additionalFields + index] = ((Object[]) payload)[index];
578             }
579         }
580         else
581         {
582             result[additionalFields] = payload;
583         }
584         // TODO verify the composition of the result. Problem: no access to the metadata here
585         try
586         {
587             this.returnWrapper.encodeReplyAndTransmit(result);
588         }
589         catch (Sim0MQException | SerializationException e)
590         {
591             e.printStackTrace();
592         }
593     }
594 
595 }