SubscriptionHandler.java

  1. package org.opentrafficsim.sim0mq.publisher;

  2. import java.rmi.RemoteException;
  3. import java.util.ArrayList;
  4. import java.util.EnumSet;
  5. import java.util.LinkedHashMap;
  6. import java.util.List;
  7. import java.util.Map;

  8. import org.djutils.exceptions.Throw;
  9. import org.djutils.event.Event;
  10. import org.djutils.event.EventListener;
  11. import org.djutils.event.EventProducer;
  12. import org.djutils.event.EventType;
  13. import org.djutils.event.TimedEvent;
  14. import org.djutils.metadata.MetaData;
  15. import org.djutils.metadata.ObjectDescriptor;
  16. import org.djutils.serialization.SerializationException;
  17. import org.sim0mq.Sim0MQException;

  18. /**
  19.  * Data collection that can be listed and has subscription to change events.
  20.  * <p>
  21.  * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
  22.  * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
  23.  * </p>
  24.  * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
  25.  * @author <a href="https://github.com/peter-knoppers">Peter Knoppers</a>
  26.  */
  27. public class SubscriptionHandler
  28. {
  29.     /** Id of this SubscriptionHandler. */
  30.     private final String id;

  31.     /** Transceiver to retrieve the data right now; e.g. GtuIdTransceiver. */
  32.     private final TransceiverInterface listTransceiver;

  33.     /** Event producer for add, remove, or change events; e.g. the Network. */
  34.     private final LookupEventProducer eventProducerForAddRemoveOrChange;

  35.     /** EventType to subscribe to in order to receive creation of added object events; e.g. Network.GTU_ADD_EVENT. */
  36.     private final EventType addedEventType;

  37.     /** EventType to subscribe to in order to receive removed object events; e.g. Network.GTU_REMOVE_EVENT. */
  38.     private final EventType removedEventType;

  39.     /** EventType to subscript to in order to receive change of the collection, or object events. */
  40.     private final EventType changeEventType;

  41.     /** SubscriptionHandler that handles subscriptions to individual objects; e.g. GTU.MOVE_EVENT. */
  42.     private final SubscriptionHandler elementSubscriptionHandler;

  43.     /** The currently active subscriptions. */
  44.     private final Map<ReturnWrapper, Subscription> subscriptions = new LinkedHashMap<>();

  45.     /**
  46.      * Create a new SubscriptionHandler.
  47.      * @param id id of the new SubscriptionHandler
  48.      * @param listTransceiver transceiver to retrieve the data of <i>the addressed object</i> right now
  49.      * @param eventProducerForAddRemoveOrChange the event producer that can emit the <code>addedEventType</code>,
  50.      *            <code>removedEventType</code>, or <code>changeEventType</code> events
  51.      * @param addedEventType event type that signals that a new element has been added, should be null if there is no added
  52.      *            event type for the data
  53.      * @param removedEventType event type that signals that an element has been removed, should be null if there is no removed
  54.      *            event type for the data
  55.      * @param changeEventType event type that signals that an element has been changed, should be null if there is no change
  56.      *            event type for the data
  57.      * @param elementSubscriptionHandler SubscriptionHandler for events produced by the underlying elements
  58.      */
  59.     public SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
  60.             final LookupEventProducer eventProducerForAddRemoveOrChange, final EventType addedEventType,
  61.             final EventType removedEventType, final EventType changeEventType,
  62.             final SubscriptionHandler elementSubscriptionHandler)
  63.     {
  64.         Throw.whenNull(id, "Id may not be null");
  65.         Throw.when(
  66.                 null == eventProducerForAddRemoveOrChange
  67.                         && (addedEventType != null || removedEventType != null || changeEventType != null),
  68.                 NullPointerException.class,
  69.                 "eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
  70.         this.id = id;
  71.         this.listTransceiver = listTransceiver;
  72.         this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
  73.         this.addedEventType = addedEventType;
  74.         this.removedEventType = removedEventType;
  75.         this.changeEventType = changeEventType;
  76.         this.elementSubscriptionHandler = elementSubscriptionHandler;
  77.     }

  78.     /**
  79.      * Report what payload is required to retrieve a list of all elements, or data and what format a result would have.
  80.      * @return description of the payload required to retrieve a list of all elements, or data and what format a result would
  81.      *         have
  82.      */
  83.     public MetaData listRequestMetaData()
  84.     {
  85.         return this.listTransceiver.getAddressFields();
  86.     }

  87.     /**
  88.      * Report what the payload format of the result of the list transceiver.
  89.      * @return the payload format of the result of the list transceiver
  90.      */
  91.     public MetaData listResultMetaData()
  92.     {
  93.         return this.listTransceiver.getResultFields();
  94.     }

  95.     /**
  96.      * Retrieve a data collection.
  97.      * @param address address of the requested data collection
  98.      * @param returnWrapper to send back the result
  99.      * @throws RemoteException when communication fails
  100.      * @throws SerializationException on context error
  101.      * @throws Sim0MQException on DSOL error
  102.      */
  103.     public void get(final Object[] address, final ReturnWrapper returnWrapper)
  104.             throws RemoteException, Sim0MQException, SerializationException
  105.     {
  106.         sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
  107.     }

  108.     /**
  109.      * Retrieve the list transceiver (only for testing).
  110.      * @return the list transceiver
  111.      */
  112.     public TransceiverInterface getListTransceiver()
  113.     {
  114.         return this.listTransceiver;
  115.     }

  116.     /**
  117.      * Return the set of supported commands.
  118.      * @return the set of supported commands.
  119.      */
  120.     public final EnumSet<Command> subscriptionOptions()
  121.     {
  122.         EnumSet<Command> result = EnumSet.noneOf(Command.class);
  123.         if (null != this.addedEventType)
  124.         {
  125.             result.add(Command.SUBSCRIBE_TO_ADD);
  126.             result.add(Command.UNSUBSCRIBE_FROM_ADD);
  127.         }
  128.         if (null != this.removedEventType)
  129.         {
  130.             result.add(Command.SUBSCRIBE_TO_REMOVE);
  131.             result.add(Command.UNSUBSCRIBE_FROM_REMOVE);
  132.         }
  133.         if (null != this.changeEventType)
  134.         {
  135.             result.add(Command.SUBSCRIBE_TO_CHANGE);
  136.             result.add(Command.UNSUBSCRIBE_FROM_CHANGE);
  137.         }
  138.         if (null != this.listTransceiver)
  139.         {
  140.             result.add(Command.GET_CURRENT);
  141.             result.add(Command.GET_ADDRESS_META_DATA);
  142.             result.add(Command.GET_RESULT_META_DATA);
  143.         }
  144.         return result;
  145.     }

  146.     /**
  147.      * Create a new subscription to ADD, REMOVE, or CHANGE events.
  148.      * @param address the data that is required to find the correct EventProducer
  149.      * @param eventType one of the event types that the addressed EventProducer can fire
  150.      * @param returnWrapper generates envelopes for the returned events
  151.      * @throws RemoteException when communication fails
  152.      * @throws SerializationException should never happen
  153.      * @throws Sim0MQException should never happen
  154.      */
  155.     private void subscribeTo(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
  156.             throws RemoteException, Sim0MQException, SerializationException
  157.     {
  158.         if (null == eventType)
  159.         {
  160.             returnWrapper.nack("Does not support subscribe to");
  161.             return;
  162.         }
  163.         String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
  164.         if (bad != null)
  165.         {
  166.             returnWrapper.nack("Bad address: " + bad);
  167.             return;
  168.         }
  169.         EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
  170.         if (null == epi)
  171.         {
  172.             // Not necessarily bad; some EventProducers (e.g. GTUs) may disappear at any time
  173.             return; // NACK has been sent by this.eventProducerForAddRemoveOrChange.lookup
  174.         }
  175.         Subscription subscription = this.subscriptions.get(returnWrapper);
  176.         if (null == subscription)
  177.         {
  178.             subscription = new Subscription(returnWrapper);
  179.             this.subscriptions.put(returnWrapper, subscription);
  180.         }
  181.         if (epi.addListener(subscription, eventType))
  182.         {
  183.             returnWrapper.ack("Subscription created");
  184.         }
  185.         else
  186.         {
  187.             // There was already a subscription?
  188.             returnWrapper.ack("There was already such a subscription active");
  189.         }
  190.         // FIXME: if the subscription is an an Object that later disappears, the subscription map will still consume memory for
  191.         // that subscription. That could add up to a lot of memory ...
  192.     }

  193.     /**
  194.      * Cancel a subscription to ADD, REMOVE, or CHANGE events.
  195.      * @param address the data that is required to find the correct EventProducer
  196.      * @param eventType one of the event types that the addressed EventProducer can fire
  197.      * @param returnWrapper the ReturnWapper that sent the results until now
  198.      * @throws RemoteException when communication fails
  199.      * @throws SerializationException should never happen
  200.      * @throws Sim0MQException should never happen
  201.      */
  202.     private void unsubscribeFrom(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
  203.             throws RemoteException, Sim0MQException, SerializationException
  204.     {
  205.         if (null == eventType)
  206.         {
  207.             returnWrapper.nack("Does not support unsubscribe from");
  208.             return;
  209.         }
  210.         String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
  211.         if (bad != null)
  212.         {
  213.             returnWrapper.nack("Bad address: " + bad);
  214.             return;
  215.         }
  216.         EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
  217.         if (null == epi)
  218.         {
  219.             returnWrapper.nack("Cound not find the event producer of the subscription; has it dissapeared?");
  220.             return;
  221.         }
  222.         Subscription subscription = this.subscriptions.get(returnWrapper);
  223.         if (null == subscription)
  224.         {
  225.             returnWrapper.nack("Cound not find a subscription to cancel");
  226.         }
  227.         else if (!epi.removeListener(subscription, eventType))
  228.         {
  229.             returnWrapper.nack("Subscription was not found");
  230.         }
  231.         else
  232.         {
  233.             this.subscriptions.remove(returnWrapper);
  234.             returnWrapper.ack("Subscription removed");
  235.         }
  236.     }

  237.     /**
  238.      * Retrieve the id of this SubscriptionHandler.
  239.      * @return the id of this SubscriptionHandler
  240.      */
  241.     public final String getId()
  242.     {
  243.         return this.id;
  244.     }

  245.     /**
  246.      * The commands that a SubscriptionHandler understands.
  247.      */
  248.     public enum Command
  249.     {
  250.         /** Subscribe to add events. */
  251.         SUBSCRIBE_TO_ADD,
  252.         /** Subscribe to remove events. */
  253.         SUBSCRIBE_TO_REMOVE,
  254.         /** Subscribe to change events. */
  255.         SUBSCRIBE_TO_CHANGE,
  256.         /** Unsubscribe to add events. */
  257.         UNSUBSCRIBE_FROM_ADD,
  258.         /** Unsubscribe to remove events. */
  259.         UNSUBSCRIBE_FROM_REMOVE,
  260.         /** Unsubscribe to change events. */
  261.         UNSUBSCRIBE_FROM_CHANGE,
  262.         /** Get current set (if a collection), c.q. state (if properties of one object). */
  263.         GET_CURRENT,
  264.         /** Get the address meta data. */
  265.         GET_ADDRESS_META_DATA,
  266.         /** Get the result meta data. */
  267.         GET_RESULT_META_DATA,
  268.         /** Get the output of the IdSource. */
  269.         GET_LIST,
  270.         /** Get the set of implemented commands (must - itself - always be implemented). */
  271.         GET_COMMANDS;
  272.     }

  273.     /**
  274.      * Convert a String representing a Command into that Command.
  275.      * @param commandString the string
  276.      * @return the corresponding Command, or null if the <code>commandString</code> is not a valid Command
  277.      */
  278.     public static Command lookupCommand(final String commandString)
  279.     {
  280.         if ("GET_ADDRESS_META_DATA".equals(commandString))
  281.         {
  282.             return Command.GET_ADDRESS_META_DATA;
  283.         }
  284.         else if ("GET_CURRENT".equals(commandString))
  285.         {
  286.             return Command.GET_CURRENT;
  287.         }
  288.         else if ("GET_RESULT_META_DATA".equals(commandString))
  289.         {
  290.             return Command.GET_RESULT_META_DATA;
  291.         }
  292.         else if ("GET_RESULT_META_DATA".equals(commandString))
  293.         {
  294.             return Command.GET_RESULT_META_DATA;
  295.         }
  296.         else if ("SUBSCRIBE_TO_ADD".equals(commandString))
  297.         {
  298.             return Command.SUBSCRIBE_TO_ADD;
  299.         }
  300.         else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
  301.         {
  302.             return Command.SUBSCRIBE_TO_CHANGE;
  303.         }
  304.         else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
  305.         {
  306.             return Command.SUBSCRIBE_TO_REMOVE;
  307.         }
  308.         else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
  309.         {
  310.             return Command.UNSUBSCRIBE_FROM_ADD;
  311.         }
  312.         else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
  313.         {
  314.             return Command.UNSUBSCRIBE_FROM_REMOVE;
  315.         }
  316.         else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
  317.         {
  318.             return Command.UNSUBSCRIBE_FROM_CHANGE;
  319.         }
  320.         else if ("GET_LIST".contentEquals(commandString))
  321.         {
  322.             return Command.GET_LIST;
  323.         }
  324.         else if ("GET_COMMANDS".contentEquals(commandString))
  325.         {
  326.             return Command.GET_COMMANDS;
  327.         }
  328.         System.err.println("Could not find command with name \"" + commandString + "\"");
  329.         return null;
  330.     }

  331.     /**
  332.      * Execute one command.
  333.      * @param command the command
  334.      * @param address Object[] the address of the object on which the command must be applied
  335.      * @param returnWrapper envelope generator for replies
  336.      * @throws RemoteException on communication failure
  337.      * @throws SerializationException on illegal type in serialization
  338.      * @throws Sim0MQException on communication error
  339.      */
  340.     public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
  341.             throws RemoteException, Sim0MQException, SerializationException
  342.     {
  343.         Throw.whenNull(command, "Command may not be null");
  344.         Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
  345.         switch (command)
  346.         {
  347.             case SUBSCRIBE_TO_ADD:
  348.                 subscribeTo(address, this.addedEventType, returnWrapper);
  349.                 break;

  350.             case SUBSCRIBE_TO_CHANGE:
  351.                 subscribeTo(address, this.changeEventType, returnWrapper);
  352.                 break;

  353.             case SUBSCRIBE_TO_REMOVE:
  354.                 subscribeTo(address, this.removedEventType, returnWrapper);
  355.                 break;

  356.             case UNSUBSCRIBE_FROM_ADD:
  357.                 unsubscribeFrom(address, this.addedEventType, returnWrapper);
  358.                 break;

  359.             case UNSUBSCRIBE_FROM_CHANGE:
  360.                 unsubscribeFrom(address, this.changeEventType, returnWrapper);
  361.                 break;

  362.             case UNSUBSCRIBE_FROM_REMOVE:
  363.                 unsubscribeFrom(address, this.removedEventType, returnWrapper);
  364.                 break;

  365.             case GET_CURRENT:
  366.             {
  367.                 Object[] result = this.listTransceiver.get(address, returnWrapper);
  368.                 if (null != result)
  369.                 {
  370.                     sendResult(result, returnWrapper);
  371.                 }
  372.                 // TODO else?
  373.                 break;
  374.             }

  375.             case GET_ADDRESS_META_DATA:
  376.                 if (null == this.listTransceiver)
  377.                 {
  378.                     returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
  379.                 }
  380.                 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
  381.                         returnWrapper);
  382.                 break;

  383.             case GET_RESULT_META_DATA:
  384.                 if (null == this.listTransceiver)
  385.                 {
  386.                     returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
  387.                 }
  388.                 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
  389.                         returnWrapper);
  390.                 break;

  391.             case GET_LIST:
  392.             {
  393.                 if (this.listTransceiver.hasIdSource())
  394.                 {
  395.                     sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
  396.                             returnWrapper);
  397.                 }
  398.                 else
  399.                 {
  400.                     sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
  401.                 }
  402.                 break;
  403.             }

  404.             case GET_COMMANDS:
  405.                 List<String> resultList = new ArrayList<>();
  406.                 if (null != this.addedEventType)
  407.                 {
  408.                     resultList.add(Command.SUBSCRIBE_TO_ADD.toString());
  409.                     resultList.add(Command.UNSUBSCRIBE_FROM_ADD.toString());
  410.                 }
  411.                 if (null != this.removedEventType)
  412.                 {
  413.                     resultList.add(Command.SUBSCRIBE_TO_REMOVE.toString());
  414.                     resultList.add(Command.UNSUBSCRIBE_FROM_REMOVE.toString());

  415.                 }
  416.                 if (null != this.changeEventType)
  417.                 {
  418.                     resultList.add(Command.SUBSCRIBE_TO_CHANGE.toString());
  419.                     resultList.add(Command.UNSUBSCRIBE_FROM_CHANGE.toString());
  420.                 }
  421.                 if (this.listTransceiver.getAddressFields() != null)
  422.                 {
  423.                     resultList.add(Command.GET_ADDRESS_META_DATA.toString());
  424.                 }
  425.                 if (this.listTransceiver.getResultFields() != null)
  426.                 {
  427.                     resultList.add(Command.GET_RESULT_META_DATA.toString());
  428.                 }
  429.                 if (null != this.listTransceiver)
  430.                 {
  431.                     resultList.add(Command.GET_LIST.toString());
  432.                 }
  433.                 resultList.add(Command.GET_COMMANDS.toString());
  434.                 Object[] result = new Object[resultList.size()];
  435.                 for (int index = 0; index < result.length; index++)
  436.                 {
  437.                     result[index] = resultList.get(index);
  438.                 }
  439.                 returnWrapper.encodeReplyAndTransmit(result);
  440.                 break;

  441.             default:
  442.                 // Cannot happen
  443.                 break;
  444.         }
  445.     }

  446.     /**
  447.      * Extract the class names from an array of ObjectDescriptor.
  448.      * @param objectDescriptors the array of ObjectDescriptor
  449.      * @return the class names
  450.      */
  451.     private Object[] extractObjectDescriptorClassNames(final ObjectDescriptor[] objectDescriptors)
  452.     {
  453.         Object[] result = new Object[objectDescriptors.length];
  454.         for (int index = 0; index < objectDescriptors.length; index++)
  455.         {
  456.             result[index] = objectDescriptors[index].getObjectClass().getName();
  457.         }
  458.         return result;
  459.     }

  460.     /**
  461.      * Send data via Sim0MQ to master if (and only if) it is non-null.
  462.      * @param data the data to transmit
  463.      * @param returnWrapper envelope constructor for returned results
  464.      * @throws SerializationException on illegal type in serialization
  465.      * @throws Sim0MQException on communication error
  466.      */
  467.     private void sendResult(final Object[] data, final ReturnWrapper returnWrapper)
  468.             throws Sim0MQException, SerializationException
  469.     {
  470.         if (data != null)
  471.         {
  472.             returnWrapper.encodeReplyAndTransmit(data);
  473.         }
  474.     }

  475.     @Override
  476.     public String toString()
  477.     {
  478.         return "SubscriptionHandler [id=" + this.id + ", listTransceiver=" + this.listTransceiver
  479.                 + ", eventProducerForAddRemoveOrChange=" + this.eventProducerForAddRemoveOrChange + ", addedEventType="
  480.                 + this.addedEventType + ", removedEventType=" + this.removedEventType + ", changeEventType="
  481.                 + this.changeEventType + ", elementSubscriptionHandler=" + this.elementSubscriptionHandler + "]";
  482.     }

  483. }

  484. /**
  485.  * Handles one subscription.
  486.  */
  487. class Subscription implements EventListener
  488. {
  489.     /** ... */
  490.     private static final long serialVersionUID = 20200428L;

  491.     /** Generates envelopes for the messages sent over Sim0MQ. */
  492.     private final ReturnWrapper returnWrapper;

  493.     /**
  494.      * Construct a new Subscription.
  495.      * @param returnWrapper envelope generator for the messages
  496.      */
  497.     Subscription(final ReturnWrapper returnWrapper)
  498.     {
  499.         this.returnWrapper = returnWrapper;
  500.     }

  501.     @Override
  502.     public void notify(final Event event) throws RemoteException
  503.     {
  504.         MetaData metaData = event.getType().getMetaData();
  505.         int additionalFields = event.getType() instanceof EventType ? 1 : 0;
  506.         Object[] result = new Object[additionalFields + metaData.size()];
  507.         // result[0] = event.getType().getName();
  508.         if (additionalFields > 0)
  509.         {
  510.             result[0] = ((TimedEvent<?>) event).getTimeStamp();
  511.         }
  512.         Object payload = event.getContent();
  513.         if (payload instanceof Object[])
  514.         {
  515.             for (int index = 0; index < event.getType().getMetaData().size(); index++)
  516.             {
  517.                 result[additionalFields + index] = ((Object[]) payload)[index];
  518.             }
  519.         }
  520.         else
  521.         {
  522.             result[additionalFields] = payload;
  523.         }
  524.         // TODO verify the composition of the result. Problem: no access to the metadata here
  525.         try
  526.         {
  527.             this.returnWrapper.encodeReplyAndTransmit(result);
  528.         }
  529.         catch (Sim0MQException | SerializationException e)
  530.         {
  531.             e.printStackTrace();
  532.         }
  533.     }

  534. }