Publisher.java

  1. package org.opentrafficsim.sim0mq.publisher;

  2. import java.rmi.RemoteException;
  3. import java.util.LinkedHashMap;
  4. import java.util.List;
  5. import java.util.Map;

  6. import org.djunits.Throw;
  7. import org.djutils.event.EventProducerInterface;
  8. import org.djutils.metadata.MetaData;
  9. import org.djutils.metadata.ObjectDescriptor;
  10. import org.djutils.serialization.SerializationException;
  11. import org.opentrafficsim.core.gtu.GTU;
  12. import org.opentrafficsim.core.network.Link;
  13. import org.opentrafficsim.core.network.Network;
  14. import org.opentrafficsim.core.network.OTSNetwork;
  15. import org.opentrafficsim.road.network.lane.CrossSectionLink;
  16. import org.sim0mq.Sim0MQException;

  17. /**
  18.  * Publish all available transceivers for an OTS network to a Sim0MQ master and handle its requests. <br>
  19.  * Example sequence of events: <br>
  20.  * <ol>
  21.  * <li>OTSNetwork is somehow constructed and then a Publisher for that network is constructed.</li>
  22.  * <li>Sim0MQ master requests names of all available subscription handlers</li>
  23.  * <li>Sim0MQ master decides that it wants all GTU MOVE events of all GTUs. To do that it needs to know about all GTUs when they
  24.  * are created and about all GTUs that have already been created. The Sim0MQ master issues to the publisher a request to
  25.  * subscribe to all NETWORK.GTU_ADD_EVENTs of the GTUs_in_network SubscriptionHandler</li>
  26.  * <li>This Publisher requests the GTUs_in_network SubscriptionHandler to subscribe to the add events. From now on, the
  27.  * GTUs_in_network SubscriptionHandler will receive these events generated by the OTSNetwork and transcribe those into a Sim0MQ
  28.  * events which are transmitted to the Sim0MQ master.</li>
  29.  * <li>Sim0MQ master requests publisher to list all the elements of the GTUs_in_network SubscriptionHandler</li>
  30.  * <li>This Publisher calls the list method of the GTUs_in_network SubscriptionHandler which results in a list of all active
  31.  * GTUs being sent to the Sim0MQ master</li>
  32.  * <li>The Sim0MQ master requests this Publisher to create a subscription for the update events of the GTU_move
  33.  * SubscriptionHandler, providing the GTU id as address. It does that once for every GTU id.</li>
  34.  * <li>This Publishers creates the subscriptions. From now on any GTU.MOVE_EVENT event is transcribed by the GTU_move
  35.  * SubscriptionHandler in to a corresponding Sim0MQ event and sent to the Sim0MQ master.</li>
  36.  * </ol>
  37.  * <p>
  38.  * Copyright (c) 2020-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
  39.  * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
  40.  * <p>
  41.  * $LastChangedDate: 2020-02-13 11:08:16 +0100 (Thu, 13 Feb 2020) $, @version $Revision: 6383 $, by $Author: pknoppers $,
  42.  * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
  43.  * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
  44.  */
  45. public class Publisher extends AbstractTransceiver
  46. {
  47.     /** Map Publisher names to the corresponding Publisher object. */
  48.     @SuppressWarnings("checkstyle:visibilitymodifier")
  49.     final Map<String, SubscriptionHandler> subscriptionHandlerMap = new LinkedHashMap<>();

  50.     /** Handlers for data not handled directly by this Publisher. */
  51.     private final Map<String, IncomingDataHandler> incomingDataHandlers = new LinkedHashMap<>();

  52.     /** The OTS network. */
  53.     @SuppressWarnings("checkstyle:visibilitymodifier")
  54.     final OTSNetwork network;

  55.     /**
  56.      * Construct a Publisher for an OTS network with no additional subscription handlers.
  57.      * @param network OTSNetwork; the OTS network
  58.      * @throws RemoteException ...
  59.      */
  60.     public Publisher(final OTSNetwork network) throws RemoteException
  61.     {
  62.         this(network, null, null);
  63.     }

  64.     /**
  65.      * Construct a Publisher for an OTS network.
  66.      * @param network OTSNetwork; the OTS network
  67.      * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers to
  68.      *            register (may be null)
  69.      * @param incomingDataHandlers List&lt;IncomingDataHandler&gt;; handlers for data not handled directly by the Publisher (may
  70.      *            be null).
  71.      * @throws RemoteException ...
  72.      */
  73.     public Publisher(final OTSNetwork network, final List<SubscriptionHandler> additionalSubscriptionHandlers,
  74.             final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
  75.     {
  76.         super("Publisher for " + Throw.whenNull(network, "Network may not be null").getId(),
  77.                 new MetaData("Publisher for " + network.getId(), "Publisher",
  78.                         new ObjectDescriptor[] {
  79.                                 new ObjectDescriptor("Name of subscription handler", "String", String.class) }),
  80.                 new MetaData("Subscription handlers", "Subscription handlers", new ObjectDescriptor[] {
  81.                         new ObjectDescriptor("Name of subscription handler", "String", String.class) }));
  82.         this.network = network;

  83.         if (incomingDataHandlers != null)
  84.         {
  85.             for (IncomingDataHandler incomingDataHandler : incomingDataHandlers)
  86.             {
  87.                 this.incomingDataHandlers.put(incomingDataHandler.getKey(), incomingDataHandler);
  88.             }
  89.         }

  90.         GTUIdTransceiver gtuIdTransceiver = new GTUIdTransceiver(network);
  91.         GTUTransceiver gtuTransceiver = new GTUTransceiver(network, gtuIdTransceiver);
  92.         SubscriptionHandler gtuSubscriptionHandler =
  93.                 new SubscriptionHandler("GTU move", gtuTransceiver, new LookupEventProducerInterface()
  94.                 {
  95.                     @Override
  96.                     public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
  97.                             throws Sim0MQException, SerializationException
  98.                     {
  99.                         String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
  100.                         if (bad != null)
  101.                         {
  102.                             returnWrapper.nack(bad);
  103.                             return null;
  104.                         }
  105.                         EventProducerInterface result = network.getGTU((String) address[0]);
  106.                         if (null == result)
  107.                         {
  108.                             returnWrapper.nack("No GTU with id \"" + address[0] + "\" found");
  109.                         }
  110.                         return result;
  111.                     }

  112.                     private final MetaData metaData = new MetaData("GTU Id", "GTU Id",
  113.                             new ObjectDescriptor[] { new ObjectDescriptor("GTU ID", "GTU Id", String.class) });

  114.                     @Override
  115.                     public MetaData getAddressMetaData()
  116.                     {
  117.                         return this.metaData;
  118.                     }
  119.                 }, null, null, GTU.MOVE_EVENT, null);
  120.         addSubscriptionHandler(gtuSubscriptionHandler);
  121.         addSubscriptionHandler(new SubscriptionHandler("GTUs in network", gtuIdTransceiver, new LookupEventProducerInterface()
  122.         {
  123.             @Override
  124.             public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
  125.                     throws Sim0MQException, SerializationException
  126.             {
  127.                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
  128.                 if (bad != null)
  129.                 {
  130.                     returnWrapper.nack(bad);
  131.                     return null;
  132.                 }
  133.                 return network;
  134.             }

  135.             @Override
  136.             public String toString()
  137.             {
  138.                 return "Subscription handler for GTUs in network";
  139.             }

  140.             @Override
  141.             public MetaData getAddressMetaData()
  142.             {
  143.                 return MetaData.EMPTY;
  144.             }
  145.         }, Network.GTU_ADD_EVENT, Network.GTU_REMOVE_EVENT, null, gtuSubscriptionHandler));
  146.         LinkIdTransceiver linkIdTransceiver = new LinkIdTransceiver(network);
  147.         LinkTransceiver linkTransceiver = new LinkTransceiver(network, linkIdTransceiver);
  148.         SubscriptionHandler linkSubscriptionHandler = new SubscriptionHandler("Link change", linkTransceiver, this.lookupLink,
  149.                 Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
  150.         addSubscriptionHandler(linkSubscriptionHandler);
  151.         addSubscriptionHandler(new SubscriptionHandler("Links in network", linkIdTransceiver, new LookupEventProducerInterface()
  152.         {
  153.             @Override
  154.             public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
  155.                     throws Sim0MQException, SerializationException
  156.             {
  157.                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
  158.                 if (bad != null)
  159.                 {
  160.                     returnWrapper.nack(bad);
  161.                     return null;
  162.                 }
  163.                 return network;
  164.             }

  165.             @Override
  166.             public String toString()
  167.             {
  168.                 return "Subscription handler for Links in network";
  169.             }

  170.             @Override
  171.             public MetaData getAddressMetaData()
  172.             {
  173.                 return MetaData.EMPTY;
  174.             }
  175.         }, Network.LINK_ADD_EVENT, Network.LINK_REMOVE_EVENT, null, linkSubscriptionHandler));
  176.         NodeIdTransceiver nodeIdTransceiver = new NodeIdTransceiver(network);
  177.         NodeTransceiver nodeTransceiver = new NodeTransceiver(network, nodeIdTransceiver);
  178.         // addTransceiver(nodeIdTransceiver);
  179.         // addTransceiver(new NodeTransceiver(network, nodeIdTransceiver));
  180.         SubscriptionHandler nodeSubscriptionHandler =
  181.                 new SubscriptionHandler("Node change", nodeTransceiver, new LookupEventProducerInterface()
  182.                 {
  183.                     @Override
  184.                     public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
  185.                     {
  186.                         return null; // Nodes do not emit events
  187.                     }

  188.                     @Override
  189.                     public String toString()
  190.                     {
  191.                         return "Subscription handler for Node change";
  192.                     }

  193.                     private final MetaData metaData = new MetaData("Node Id", "Node Id",
  194.                             new ObjectDescriptor[] { new ObjectDescriptor("Node ID", "Node Id", String.class) });

  195.                     @Override
  196.                     public MetaData getAddressMetaData()
  197.                     {
  198.                         return this.metaData;
  199.                     }
  200.                 }, null, null, null, null);
  201.         addSubscriptionHandler(nodeSubscriptionHandler);
  202.         addSubscriptionHandler(new SubscriptionHandler("Nodes in network", nodeIdTransceiver, new LookupEventProducerInterface()
  203.         {
  204.             @Override
  205.             public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
  206.                     throws Sim0MQException, SerializationException
  207.             {
  208.                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
  209.                 if (bad != null)
  210.                 {
  211.                     returnWrapper.nack(bad);
  212.                     return null;
  213.                 }
  214.                 return network;
  215.             }

  216.             @Override
  217.             public String toString()
  218.             {
  219.                 return "Subscription handler for Nodes in network";
  220.             }

  221.             @Override
  222.             public MetaData getAddressMetaData()
  223.             {
  224.                 return MetaData.EMPTY;
  225.             }
  226.         }, Network.NODE_ADD_EVENT, Network.NODE_REMOVE_EVENT, null, nodeSubscriptionHandler));
  227.         SubscriptionHandler linkGTUIdSubscriptionHandler = new SubscriptionHandler("GTUs on Link",
  228.                 new LinkGTUIdTransceiver(network), this.lookupLink, Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
  229.         addSubscriptionHandler(linkGTUIdSubscriptionHandler);
  230.         addSubscriptionHandler(new SubscriptionHandler("Cross section elements on Link",
  231.                 new CrossSectionElementTransceiver(network), this.lookupLink, CrossSectionLink.LANE_ADD_EVENT,
  232.                 CrossSectionLink.LANE_REMOVE_EVENT, null, linkGTUIdSubscriptionHandler));
  233.         // addTransceiver(new LaneGTUIdTransceiver(network));
  234.         SimulatorStateTransceiver stt = new SimulatorStateTransceiver(network.getSimulator());
  235.         SubscriptionHandler simulatorStateSubscriptionHandler = new SubscriptionHandler("Simulator running", stt,
  236.                 stt.getLookupEventProducerInterface(), null, null, SimulatorStateTransceiver.SIMULATOR_STATE_CHANGED, null);
  237.         addSubscriptionHandler(simulatorStateSubscriptionHandler);

  238.         if (additionalSubscriptionHandlers != null)
  239.         {
  240.             for (SubscriptionHandler sh : additionalSubscriptionHandlers)
  241.             {
  242.                 addSubscriptionHandler(sh);
  243.             }
  244.         }

  245.         addSubscriptionHandler(new SubscriptionHandler("", this, null, null, null, null, null)); // The meta transceiver
  246.     }

  247.     /** Lookup a CrossSectionLink in the network. */
  248.     private LookupEventProducerInterface lookupLink = new LookupEventProducerInterface()
  249.     {
  250.         @Override
  251.         public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
  252.                 throws IndexOutOfBoundsException, Sim0MQException, SerializationException
  253.         {
  254.             Throw.whenNull(address, "LookupLink requires the name of a link");
  255.             Throw.when(address.length != 1 || !(address[1] instanceof String), IllegalArgumentException.class, "Bad address");
  256.             Link link = Publisher.this.network.getLink((String) address[0]);
  257.             if (null == link)
  258.             {
  259.                 returnWrapper.nack("Network does not contain a Link with id " + address[0]);
  260.                 return null;
  261.             }
  262.             if (!(link instanceof EventProducerInterface))
  263.             {
  264.                 returnWrapper.nack("Link \"" + address[0] + "\" is not able to handle subscriptions");
  265.                 return null;
  266.             }
  267.             return (CrossSectionLink) link;
  268.         }

  269.         @Override
  270.         public String toString()
  271.         {
  272.             return "LookupProducerInterface that looks up a Link in the network";
  273.         }

  274.         @Override
  275.         public MetaData getAddressMetaData()
  276.         {
  277.             return new MetaData("Link id", "Name of a link in the network",
  278.                     new ObjectDescriptor[] { new ObjectDescriptor("Link id", "Name of a link in the network", String.class) });
  279.         }
  280.     };

  281.     /**
  282.      * Add a SubscriptionHandler to the map.
  283.      * @param subscriptionHandler SubscriptionHandler; the subscription handler to add to the map
  284.      */
  285.     private void addSubscriptionHandler(final SubscriptionHandler subscriptionHandler)
  286.     {
  287.         this.subscriptionHandlerMap.put(subscriptionHandler.getId(), subscriptionHandler);
  288.     }

  289.     /** {@inheritDoc} */
  290.     @Override
  291.     public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
  292.             throws Sim0MQException, SerializationException
  293.     {
  294.         Throw.whenNull(returnWrapper, "returnWrapper may not be null");
  295.         String bad = verifyMetaData(getAddressFields(), address);
  296.         if (bad != null)
  297.         {
  298.             returnWrapper.nack("Bad address (should be the name of a transceiver): " + bad);
  299.             return null;
  300.         }
  301.         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(address[0]);
  302.         if (null == subscriptionHandler)
  303.         {
  304.             returnWrapper.nack("No transceiver with name \"" + address[0] + "\"");
  305.             return null;
  306.         }
  307.         return new Object[] { subscriptionHandler };
  308.     }

  309.     /** Returned by the getIdSource method. */
  310.     private final TransceiverInterface idSource = new TransceiverInterface()
  311.     {
  312.         @Override
  313.         public String getId()
  314.         {
  315.             return "Transceiver for names of available transceivers in Publisher";
  316.         }

  317.         @Override
  318.         public MetaData getAddressFields()
  319.         {
  320.             return MetaData.EMPTY;
  321.         }

  322.         /** Result of getResultFields. */
  323.         private MetaData resultMetaData =
  324.                 new MetaData("Transceiver names available in Publisher", "String array", new ObjectDescriptor[] {
  325.                         new ObjectDescriptor("Transceiver names available in Publisher", "String array", String[].class) });

  326.         @Override
  327.         public MetaData getResultFields()
  328.         {
  329.             return this.resultMetaData;
  330.         }

  331.         @Override
  332.         public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
  333.                 throws RemoteException, Sim0MQException, SerializationException
  334.         {
  335.             Object[] result = new Object[Publisher.this.subscriptionHandlerMap.size()];
  336.             int index = 0;
  337.             for (String key : Publisher.this.subscriptionHandlerMap.keySet())
  338.             {
  339.                 result[index++] = key;
  340.             }
  341.             return result;
  342.         }
  343.     };

  344.     /** {@inheritDoc} */
  345.     @Override
  346.     public TransceiverInterface getIdSource(final int addressLevel, final ReturnWrapper returnWrapper)
  347.             throws Sim0MQException, SerializationException
  348.     {
  349.         if (0 != addressLevel)
  350.         {
  351.             returnWrapper.encodeReplyAndTransmit("Address should be 0");
  352.             return null;
  353.         }
  354.         return this.idSource;
  355.     }

  356.     /** {@inheritDoc} */
  357.     @Override
  358.     public boolean hasIdSource()
  359.     {
  360.         return true;
  361.     }

  362.     /**
  363.      * Execute one command.
  364.      * @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
  365.      * @param command SubscriptionHandler.Command; the operation to perform
  366.      * @param address Object[]; the address on which to perform the operation
  367.      * @param returnWrapper ReturnWrapper; to transmit the result
  368.      * @throws RemoteException on RMI network failure
  369.      * @throws SerializationException on illegal type in serialization
  370.      * @throws Sim0MQException on communication error
  371.      */
  372.     public void executeCommand(final String subscriptionHandlerName, final SubscriptionHandler.Command command,
  373.             final Object[] address, final ReturnWrapper returnWrapper)
  374.             throws RemoteException, Sim0MQException, SerializationException
  375.     {
  376.         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(subscriptionHandlerName);
  377.         if (null == subscriptionHandler)
  378.         {
  379.             returnWrapper.nack("No subscription handler for \"" + subscriptionHandlerName + "\"");
  380.             return;
  381.         }
  382.         subscriptionHandler.executeCommand(command, address, returnWrapper);
  383.     }

  384.     /**
  385.      * Execute one command.
  386.      * @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
  387.      * @param commandString String; the operation to perform
  388.      * @param address Object[]; the address on which to perform the operation
  389.      * @param returnWrapper ReturnWrapper; to transmit the result
  390.      * @throws RemoteException on RMI network failure
  391.      * @throws SerializationException on illegal type in serialization
  392.      * @throws Sim0MQException on communication error
  393.      */
  394.     public void executeCommand(final String subscriptionHandlerName, final String commandString, final Object[] address,
  395.             final ReturnWrapperImpl returnWrapper) throws RemoteException, Sim0MQException, SerializationException
  396.     {
  397.         executeCommand(subscriptionHandlerName,
  398.                 Throw.whenNull(SubscriptionHandler.lookupCommand(commandString), "Invalid command (%s)", commandString),
  399.                 address, returnWrapper);
  400.     }
  401.    
  402.     /**
  403.      * Find the IncomingDataHandler for a particular key.
  404.      * @param key String; the key of the IncomingDataHandler
  405.      * @return IncomingDataHandler; or null if there is no IncomingDataHandler for the key
  406.      */
  407.     public IncomingDataHandler lookupIncomingDataHandler(final String key)
  408.     {
  409.         return this.incomingDataHandlers.get(key);
  410.     }

  411. }