View Javadoc
1   package org.opentrafficsim.sim0mq.publisher;
2   
3   import java.rmi.RemoteException;
4   import java.util.LinkedHashMap;
5   import java.util.List;
6   import java.util.Map;
7   
8   import org.djunits.Throw;
9   import org.djutils.event.EventProducerInterface;
10  import org.djutils.metadata.MetaData;
11  import org.djutils.metadata.ObjectDescriptor;
12  import org.djutils.serialization.SerializationException;
13  import org.opentrafficsim.core.gtu.GTU;
14  import org.opentrafficsim.core.network.Link;
15  import org.opentrafficsim.core.network.Network;
16  import org.opentrafficsim.core.network.OTSNetwork;
17  import org.opentrafficsim.road.network.lane.CrossSectionLink;
18  import org.sim0mq.Sim0MQException;
19  
20  /**
21   * Publish all available transceivers for an OTS network to a Sim0MQ master and handle its requests. <br>
22   * Example sequence of events: <br>
23   * <ol>
24   * <li>OTSNetwork is somehow constructed and then a Publisher for that network is constructed.</li>
25   * <li>Sim0MQ master requests names of all available subscription handlers</li>
26   * <li>Sim0MQ master decides that it wants all GTU MOVE events of all GTUs. To do that it needs to know about all GTUs when they
27   * are created and about all GTUs that have already been created. The Sim0MQ master issues to the publisher a request to
28   * subscribe to all NETWORK.GTU_ADD_EVENTs of the GTUs_in_network SubscriptionHandler</li>
29   * <li>This Publisher requests the GTUs_in_network SubscriptionHandler to subscribe to the add events. From now on, the
30   * GTUs_in_network SubscriptionHandler will receive these events generated by the OTSNetwork and transcribe those into a Sim0MQ
31   * events which are transmitted to the Sim0MQ master.</li>
32   * <li>Sim0MQ master requests publisher to list all the elements of the GTUs_in_network SubscriptionHandler</li>
33   * <li>This Publisher calls the list method of the GTUs_in_network SubscriptionHandler which results in a list of all active
34   * GTUs being sent to the Sim0MQ master</li>
35   * <li>The Sim0MQ master requests this Publisher to create a subscription for the update events of the GTU_move
36   * SubscriptionHandler, providing the GTU id as address. It does that once for every GTU id.</li>
37   * <li>This Publishers creates the subscriptions. From now on any GTU.MOVE_EVENT event is transcribed by the GTU_move
38   * SubscriptionHandler in to a corresponding Sim0MQ event and sent to the Sim0MQ master.</li>
39   * </ol>
40   * <p>
41   * Copyright (c) 2020-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
42   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
43   * <p>
44   * $LastChangedDate: 2020-02-13 11:08:16 +0100 (Thu, 13 Feb 2020) $, @version $Revision: 6383 $, by $Author: pknoppers $,
45   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
46   * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
47   */
48  public class Publisher extends AbstractTransceiver
49  {
50      /** Map Publisher names to the corresponding Publisher object. */
51      @SuppressWarnings("checkstyle:visibilitymodifier")
52      final Map<String, SubscriptionHandler> subscriptionHandlerMap = new LinkedHashMap<>();
53  
54      /** Handlers for data not handled directly by this Publisher. */
55      private final Map<String, IncomingDataHandler> incomingDataHandlers = new LinkedHashMap<>();
56  
57      /** The OTS network. */
58      @SuppressWarnings("checkstyle:visibilitymodifier")
59      final OTSNetwork network;
60  
61      /**
62       * Construct a Publisher for an OTS network with no additional subscription handlers.
63       * @param network OTSNetwork; the OTS network
64       * @throws RemoteException ...
65       */
66      public Publisher(final OTSNetwork network) throws RemoteException
67      {
68          this(network, null, null);
69      }
70  
71      /**
72       * Construct a Publisher for an OTS network.
73       * @param network OTSNetwork; the OTS network
74       * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers to
75       *            register (may be null)
76       * @param incomingDataHandlers List&lt;IncomingDataHandler&gt;; handlers for data not handled directly by the Publisher (may
77       *            be null).
78       * @throws RemoteException ...
79       */
80      public Publisher(final OTSNetwork network, final List<SubscriptionHandler> additionalSubscriptionHandlers,
81              final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
82      {
83          super("Publisher for " + Throw.whenNull(network, "Network may not be null").getId(),
84                  new MetaData("Publisher for " + network.getId(), "Publisher",
85                          new ObjectDescriptor[] {
86                                  new ObjectDescriptor("Name of subscription handler", "String", String.class) }),
87                  new MetaData("Subscription handlers", "Subscription handlers", new ObjectDescriptor[] {
88                          new ObjectDescriptor("Name of subscription handler", "String", String.class) }));
89          this.network = network;
90  
91          if (incomingDataHandlers != null)
92          {
93              for (IncomingDataHandler incomingDataHandler : incomingDataHandlers)
94              {
95                  this.incomingDataHandlers.put(incomingDataHandler.getKey(), incomingDataHandler);
96              }
97          }
98  
99          GTUIdTransceiver gtuIdTransceiver = new GTUIdTransceiver(network);
100         GTUTransceiver gtuTransceiver = new GTUTransceiver(network, gtuIdTransceiver);
101         SubscriptionHandler gtuSubscriptionHandler =
102                 new SubscriptionHandler("GTU move", gtuTransceiver, new LookupEventProducerInterface()
103                 {
104                     @Override
105                     public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
106                             throws Sim0MQException, SerializationException
107                     {
108                         String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
109                         if (bad != null)
110                         {
111                             returnWrapper.nack(bad);
112                             return null;
113                         }
114                         EventProducerInterface result = network.getGTU((String) address[0]);
115                         if (null == result)
116                         {
117                             returnWrapper.nack("No GTU with id \"" + address[0] + "\" found");
118                         }
119                         return result;
120                     }
121 
122                     private final MetaData metaData = new MetaData("GTU Id", "GTU Id",
123                             new ObjectDescriptor[] { new ObjectDescriptor("GTU ID", "GTU Id", String.class) });
124 
125                     @Override
126                     public MetaData getAddressMetaData()
127                     {
128                         return this.metaData;
129                     }
130                 }, null, null, GTU.MOVE_EVENT, null);
131         addSubscriptionHandler(gtuSubscriptionHandler);
132         addSubscriptionHandler(new SubscriptionHandler("GTUs in network", gtuIdTransceiver, new LookupEventProducerInterface()
133         {
134             @Override
135             public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
136                     throws Sim0MQException, SerializationException
137             {
138                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
139                 if (bad != null)
140                 {
141                     returnWrapper.nack(bad);
142                     return null;
143                 }
144                 return network;
145             }
146 
147             @Override
148             public String toString()
149             {
150                 return "Subscription handler for GTUs in network";
151             }
152 
153             @Override
154             public MetaData getAddressMetaData()
155             {
156                 return MetaData.EMPTY;
157             }
158         }, Network.GTU_ADD_EVENT, Network.GTU_REMOVE_EVENT, null, gtuSubscriptionHandler));
159         LinkIdTransceiver linkIdTransceiver = new LinkIdTransceiver(network);
160         LinkTransceiver linkTransceiver = new LinkTransceiver(network, linkIdTransceiver);
161         SubscriptionHandler linkSubscriptionHandler = new SubscriptionHandler("Link change", linkTransceiver, this.lookupLink,
162                 Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
163         addSubscriptionHandler(linkSubscriptionHandler);
164         addSubscriptionHandler(new SubscriptionHandler("Links in network", linkIdTransceiver, new LookupEventProducerInterface()
165         {
166             @Override
167             public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
168                     throws Sim0MQException, SerializationException
169             {
170                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
171                 if (bad != null)
172                 {
173                     returnWrapper.nack(bad);
174                     return null;
175                 }
176                 return network;
177             }
178 
179             @Override
180             public String toString()
181             {
182                 return "Subscription handler for Links in network";
183             }
184 
185             @Override
186             public MetaData getAddressMetaData()
187             {
188                 return MetaData.EMPTY;
189             }
190         }, Network.LINK_ADD_EVENT, Network.LINK_REMOVE_EVENT, null, linkSubscriptionHandler));
191         NodeIdTransceiver nodeIdTransceiver = new NodeIdTransceiver(network);
192         NodeTransceiver nodeTransceiver = new NodeTransceiver(network, nodeIdTransceiver);
193         // addTransceiver(nodeIdTransceiver);
194         // addTransceiver(new NodeTransceiver(network, nodeIdTransceiver));
195         SubscriptionHandler nodeSubscriptionHandler =
196                 new SubscriptionHandler("Node change", nodeTransceiver, new LookupEventProducerInterface()
197                 {
198                     @Override
199                     public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
200                     {
201                         return null; // Nodes do not emit events
202                     }
203 
204                     @Override
205                     public String toString()
206                     {
207                         return "Subscription handler for Node change";
208                     }
209 
210                     private final MetaData metaData = new MetaData("Node Id", "Node Id",
211                             new ObjectDescriptor[] { new ObjectDescriptor("Node ID", "Node Id", String.class) });
212 
213                     @Override
214                     public MetaData getAddressMetaData()
215                     {
216                         return this.metaData;
217                     }
218                 }, null, null, null, null);
219         addSubscriptionHandler(nodeSubscriptionHandler);
220         addSubscriptionHandler(new SubscriptionHandler("Nodes in network", nodeIdTransceiver, new LookupEventProducerInterface()
221         {
222             @Override
223             public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
224                     throws Sim0MQException, SerializationException
225             {
226                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
227                 if (bad != null)
228                 {
229                     returnWrapper.nack(bad);
230                     return null;
231                 }
232                 return network;
233             }
234 
235             @Override
236             public String toString()
237             {
238                 return "Subscription handler for Nodes in network";
239             }
240 
241             @Override
242             public MetaData getAddressMetaData()
243             {
244                 return MetaData.EMPTY;
245             }
246         }, Network.NODE_ADD_EVENT, Network.NODE_REMOVE_EVENT, null, nodeSubscriptionHandler));
247         SubscriptionHandler linkGTUIdSubscriptionHandler = new SubscriptionHandler("GTUs on Link",
248                 new LinkGTUIdTransceiver(network), this.lookupLink, Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
249         addSubscriptionHandler(linkGTUIdSubscriptionHandler);
250         addSubscriptionHandler(new SubscriptionHandler("Cross section elements on Link",
251                 new CrossSectionElementTransceiver(network), this.lookupLink, CrossSectionLink.LANE_ADD_EVENT,
252                 CrossSectionLink.LANE_REMOVE_EVENT, null, linkGTUIdSubscriptionHandler));
253         // addTransceiver(new LaneGTUIdTransceiver(network));
254         SimulatorStateTransceiver stt = new SimulatorStateTransceiver(network.getSimulator());
255         SubscriptionHandler simulatorStateSubscriptionHandler = new SubscriptionHandler("Simulator running", stt,
256                 stt.getLookupEventProducerInterface(), null, null, SimulatorStateTransceiver.SIMULATOR_STATE_CHANGED, null);
257         addSubscriptionHandler(simulatorStateSubscriptionHandler);
258 
259         if (additionalSubscriptionHandlers != null)
260         {
261             for (SubscriptionHandler sh : additionalSubscriptionHandlers)
262             {
263                 addSubscriptionHandler(sh);
264             }
265         }
266 
267         addSubscriptionHandler(new SubscriptionHandler("", this, null, null, null, null, null)); // The meta transceiver
268     }
269 
270     /** Lookup a CrossSectionLink in the network. */
271     private LookupEventProducerInterface lookupLink = new LookupEventProducerInterface()
272     {
273         @Override
274         public EventProducerInterface lookup(final Object[] address, final ReturnWrapper returnWrapper)
275                 throws IndexOutOfBoundsException, Sim0MQException, SerializationException
276         {
277             Throw.whenNull(address, "LookupLink requires the name of a link");
278             Throw.when(address.length != 1 || !(address[1] instanceof String), IllegalArgumentException.class, "Bad address");
279             Link link = Publisher.this.network.getLink((String) address[0]);
280             if (null == link)
281             {
282                 returnWrapper.nack("Network does not contain a Link with id " + address[0]);
283                 return null;
284             }
285             if (!(link instanceof EventProducerInterface))
286             {
287                 returnWrapper.nack("Link \"" + address[0] + "\" is not able to handle subscriptions");
288                 return null;
289             }
290             return (CrossSectionLink) link;
291         }
292 
293         @Override
294         public String toString()
295         {
296             return "LookupProducerInterface that looks up a Link in the network";
297         }
298 
299         @Override
300         public MetaData getAddressMetaData()
301         {
302             return new MetaData("Link id", "Name of a link in the network",
303                     new ObjectDescriptor[] { new ObjectDescriptor("Link id", "Name of a link in the network", String.class) });
304         }
305     };
306 
307     /**
308      * Add a SubscriptionHandler to the map.
309      * @param subscriptionHandler SubscriptionHandler; the subscription handler to add to the map
310      */
311     private void addSubscriptionHandler(final SubscriptionHandler subscriptionHandler)
312     {
313         this.subscriptionHandlerMap.put(subscriptionHandler.getId(), subscriptionHandler);
314     }
315 
316     /** {@inheritDoc} */
317     @Override
318     public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
319             throws Sim0MQException, SerializationException
320     {
321         Throw.whenNull(returnWrapper, "returnWrapper may not be null");
322         String bad = verifyMetaData(getAddressFields(), address);
323         if (bad != null)
324         {
325             returnWrapper.nack("Bad address (should be the name of a transceiver): " + bad);
326             return null;
327         }
328         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(address[0]);
329         if (null == subscriptionHandler)
330         {
331             returnWrapper.nack("No transceiver with name \"" + address[0] + "\"");
332             return null;
333         }
334         return new Object[] { subscriptionHandler };
335     }
336 
337     /** Returned by the getIdSource method. */
338     private final TransceiverInterface idSource = new TransceiverInterface()
339     {
340         @Override
341         public String getId()
342         {
343             return "Transceiver for names of available transceivers in Publisher";
344         }
345 
346         @Override
347         public MetaData getAddressFields()
348         {
349             return MetaData.EMPTY;
350         }
351 
352         /** Result of getResultFields. */
353         private MetaData resultMetaData =
354                 new MetaData("Transceiver names available in Publisher", "String array", new ObjectDescriptor[] {
355                         new ObjectDescriptor("Transceiver names available in Publisher", "String array", String[].class) });
356 
357         @Override
358         public MetaData getResultFields()
359         {
360             return this.resultMetaData;
361         }
362 
363         @Override
364         public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
365                 throws RemoteException, Sim0MQException, SerializationException
366         {
367             Object[] result = new Object[Publisher.this.subscriptionHandlerMap.size()];
368             int index = 0;
369             for (String key : Publisher.this.subscriptionHandlerMap.keySet())
370             {
371                 result[index++] = key;
372             }
373             return result;
374         }
375     };
376 
377     /** {@inheritDoc} */
378     @Override
379     public TransceiverInterface getIdSource(final int addressLevel, final ReturnWrapper returnWrapper)
380             throws Sim0MQException, SerializationException
381     {
382         if (0 != addressLevel)
383         {
384             returnWrapper.encodeReplyAndTransmit("Address should be 0");
385             return null;
386         }
387         return this.idSource;
388     }
389 
390     /** {@inheritDoc} */
391     @Override
392     public boolean hasIdSource()
393     {
394         return true;
395     }
396 
397     /**
398      * Execute one command.
399      * @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
400      * @param command SubscriptionHandler.Command; the operation to perform
401      * @param address Object[]; the address on which to perform the operation
402      * @param returnWrapper ReturnWrapper; to transmit the result
403      * @throws RemoteException on RMI network failure
404      * @throws SerializationException on illegal type in serialization
405      * @throws Sim0MQException on communication error
406      */
407     public void executeCommand(final String subscriptionHandlerName, final SubscriptionHandler.Command command,
408             final Object[] address, final ReturnWrapper returnWrapper)
409             throws RemoteException, Sim0MQException, SerializationException
410     {
411         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(subscriptionHandlerName);
412         if (null == subscriptionHandler)
413         {
414             returnWrapper.nack("No subscription handler for \"" + subscriptionHandlerName + "\"");
415             return;
416         }
417         subscriptionHandler.executeCommand(command, address, returnWrapper);
418     }
419 
420     /**
421      * Execute one command.
422      * @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
423      * @param commandString String; the operation to perform
424      * @param address Object[]; the address on which to perform the operation
425      * @param returnWrapper ReturnWrapper; to transmit the result
426      * @throws RemoteException on RMI network failure
427      * @throws SerializationException on illegal type in serialization
428      * @throws Sim0MQException on communication error
429      */
430     public void executeCommand(final String subscriptionHandlerName, final String commandString, final Object[] address,
431             final ReturnWrapperImpl returnWrapper) throws RemoteException, Sim0MQException, SerializationException
432     {
433         executeCommand(subscriptionHandlerName,
434                 Throw.whenNull(SubscriptionHandler.lookupCommand(commandString), "Invalid command (%s)", commandString),
435                 address, returnWrapper);
436     }
437     
438     /**
439      * Find the IncomingDataHandler for a particular key.
440      * @param key String; the key of the IncomingDataHandler
441      * @return IncomingDataHandler; or null if there is no IncomingDataHandler for the key
442      */
443     public IncomingDataHandler lookupIncomingDataHandler(final String key)
444     {
445         return this.incomingDataHandlers.get(key);
446     }
447 
448 }