View Javadoc
1   package org.opentrafficsim.sim0mq.publisher;
2   
3   import java.rmi.RemoteException;
4   import java.util.LinkedHashMap;
5   import java.util.List;
6   import java.util.Map;
7   
8   import org.djunits.Throw;
9   import org.djutils.event.EventProducer;
10  import org.djutils.metadata.MetaData;
11  import org.djutils.metadata.ObjectDescriptor;
12  import org.djutils.serialization.SerializationException;
13  import org.opentrafficsim.core.gtu.Gtu;
14  import org.opentrafficsim.core.network.Link;
15  import org.opentrafficsim.core.network.Link;
16  import org.opentrafficsim.core.network.Network;
17  import org.opentrafficsim.road.network.lane.CrossSectionLink;
18  import org.sim0mq.Sim0MQException;
19  
20  /**
21   * Publish all available transceivers for an OTS network to a Sim0MQ master and handle its requests. <br>
22   * Example sequence of events: <br>
23   * <ol>
24   * <li>Network is somehow constructed and then a Publisher for that network is constructed.</li>
25   * <li>Sim0MQ master requests names of all available subscription handlers</li>
26   * <li>Sim0MQ master decides that it wants all GTU MOVE events of all GTUs. To do that it needs to know about all GTUs when they
27   * are created and about all GTUs that have already been created. The Sim0MQ master issues to the publisher a request to
28   * subscribe to all NETWORK.GTU_ADD_EVENTs of the GTUs_in_network SubscriptionHandler</li>
29   * <li>This Publisher requests the GTUs_in_network SubscriptionHandler to subscribe to the add events. From now on, the
30   * GTUs_in_network SubscriptionHandler will receive these events generated by the Network and transcribe those into a Sim0MQ
31   * events which are transmitted to the Sim0MQ master.</li>
32   * <li>Sim0MQ master requests publisher to list all the elements of the GTUs_in_network SubscriptionHandler</li>
33   * <li>This Publisher calls the list method of the GTUs_in_network SubscriptionHandler which results in a list of all active
34   * GTUs being sent to the Sim0MQ master</li>
35   * <li>The Sim0MQ master requests this Publisher to create a subscription for the update events of the GTU_move
36   * SubscriptionHandler, providing the GTU id as address. It does that once for every GTU id.</li>
37   * <li>This Publishers creates the subscriptions. From now on any GTU.MOVE_EVENT event is transcribed by the GTU_move
38   * SubscriptionHandler in to a corresponding Sim0MQ event and sent to the Sim0MQ master.</li>
39   * </ol>
40   * <p>
41   * Copyright (c) 2020-2023 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
42   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
43   * </p>
44   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
45   * @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
46   */
47  public class Publisher extends AbstractTransceiver
48  {
49      /** Map Publisher names to the corresponding Publisher object. */
50      @SuppressWarnings("checkstyle:visibilitymodifier")
51      final Map<String, SubscriptionHandler> subscriptionHandlerMap = new LinkedHashMap<>();
52  
53      /** Handlers for data not handled directly by this Publisher. */
54      private final Map<String, IncomingDataHandler> incomingDataHandlers = new LinkedHashMap<>();
55  
56      /** The OTS network. */
57      @SuppressWarnings("checkstyle:visibilitymodifier")
58      final Network network;
59  
60      /**
61       * Construct a Publisher for an OTS network with no additional subscription handlers.
62       * @param network Network; the OTS network
63       * @throws RemoteException ...
64       */
65      public Publisher(final Network network) throws RemoteException
66      {
67          this(network, null, null);
68      }
69  
70      /**
71       * Construct a Publisher for an OTS network.
72       * @param network Network; the OTS network
73       * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers to
74       *            register (may be null)
75       * @param incomingDataHandlers List&lt;IncomingDataHandler&gt;; handlers for data not handled directly by the Publisher (may
76       *            be null).
77       * @throws RemoteException ...
78       */
79      public Publisher(final Network network, final List<SubscriptionHandler> additionalSubscriptionHandlers,
80              final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
81      {
82          super("Publisher for " + Throw.whenNull(network, "Network may not be null").getId(),
83                  new MetaData("Publisher for " + network.getId(), "Publisher",
84                          new ObjectDescriptor[] {new ObjectDescriptor("Name of subscription handler", "String", String.class)}),
85                  new MetaData("Subscription handlers", "Subscription handlers",
86                          new ObjectDescriptor[] {new ObjectDescriptor("Name of subscription handler", "String", String.class)}));
87          this.network = network;
88  
89          if (incomingDataHandlers != null)
90          {
91              for (IncomingDataHandler incomingDataHandler : incomingDataHandlers)
92              {
93                  this.incomingDataHandlers.put(incomingDataHandler.getKey(), incomingDataHandler);
94              }
95          }
96  
97          GtuIdTransceiver gtuIdTransceiver = new GtuIdTransceiver(network);
98          GtuTransceiver gtuTransceiver = new GtuTransceiver(network, gtuIdTransceiver);
99          SubscriptionHandler gtuSubscriptionHandler =
100                 new SubscriptionHandler("GTU move", gtuTransceiver, new LookupEventProducer()
101                 {
102                     @Override
103                     public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
104                             throws Sim0MQException, SerializationException
105                     {
106                         String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
107                         if (bad != null)
108                         {
109                             returnWrapper.nack(bad);
110                             return null;
111                         }
112                         EventProducer result = network.getGTU((String) address[0]);
113                         if (null == result)
114                         {
115                             returnWrapper.nack("No GTU with id \"" + address[0] + "\" found");
116                         }
117                         return result;
118                     }
119 
120                     private final MetaData metaData = new MetaData("GTU Id", "GTU Id",
121                             new ObjectDescriptor[] {new ObjectDescriptor("GTU ID", "GTU Id", String.class)});
122 
123                     @Override
124                     public MetaData getAddressMetaData()
125                     {
126                         return this.metaData;
127                     }
128                 }, null, null, Gtu.MOVE_EVENT, null);
129         addSubscriptionHandler(gtuSubscriptionHandler);
130         addSubscriptionHandler(new SubscriptionHandler("GTUs in network", gtuIdTransceiver, new LookupEventProducer()
131         {
132             @Override
133             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
134                     throws Sim0MQException, SerializationException
135             {
136                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
137                 if (bad != null)
138                 {
139                     returnWrapper.nack(bad);
140                     return null;
141                 }
142                 return network;
143             }
144 
145             @Override
146             public String toString()
147             {
148                 return "Subscription handler for GTUs in network";
149             }
150 
151             @Override
152             public MetaData getAddressMetaData()
153             {
154                 return MetaData.EMPTY;
155             }
156         }, Network.GTU_ADD_EVENT, Network.GTU_REMOVE_EVENT, null, gtuSubscriptionHandler));
157         LinkIdTransceiver linkIdTransceiver = new LinkIdTransceiver(network);
158         LinkTransceiver linkTransceiver = new LinkTransceiver(network, linkIdTransceiver);
159         SubscriptionHandler linkSubscriptionHandler = new SubscriptionHandler("Link change", linkTransceiver, this.lookupLink,
160                 Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
161         addSubscriptionHandler(linkSubscriptionHandler);
162         addSubscriptionHandler(new SubscriptionHandler("Links in network", linkIdTransceiver, new LookupEventProducer()
163         {
164             @Override
165             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
166                     throws Sim0MQException, SerializationException
167             {
168                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
169                 if (bad != null)
170                 {
171                     returnWrapper.nack(bad);
172                     return null;
173                 }
174                 return network;
175             }
176 
177             @Override
178             public String toString()
179             {
180                 return "Subscription handler for Links in network";
181             }
182 
183             @Override
184             public MetaData getAddressMetaData()
185             {
186                 return MetaData.EMPTY;
187             }
188         }, Network.LINK_ADD_EVENT, Network.LINK_REMOVE_EVENT, null, linkSubscriptionHandler));
189         NodeIdTransceiver nodeIdTransceiver = new NodeIdTransceiver(network);
190         NodeTransceiver nodeTransceiver = new NodeTransceiver(network, nodeIdTransceiver);
191         // addTransceiver(nodeIdTransceiver);
192         // addTransceiver(new NodeTransceiver(network, nodeIdTransceiver));
193         SubscriptionHandler nodeSubscriptionHandler =
194                 new SubscriptionHandler("Node change", nodeTransceiver, new LookupEventProducer()
195                 {
196                     @Override
197                     public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
198                     {
199                         return null; // Nodes do not emit events
200                     }
201 
202                     @Override
203                     public String toString()
204                     {
205                         return "Subscription handler for Node change";
206                     }
207 
208                     private final MetaData metaData = new MetaData("Node Id", "Node Id",
209                             new ObjectDescriptor[] {new ObjectDescriptor("Node ID", "Node Id", String.class)});
210 
211                     @Override
212                     public MetaData getAddressMetaData()
213                     {
214                         return this.metaData;
215                     }
216                 }, null, null, null, null);
217         addSubscriptionHandler(nodeSubscriptionHandler);
218         addSubscriptionHandler(new SubscriptionHandler("Nodes in network", nodeIdTransceiver, new LookupEventProducer()
219         {
220             @Override
221             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
222                     throws Sim0MQException, SerializationException
223             {
224                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
225                 if (bad != null)
226                 {
227                     returnWrapper.nack(bad);
228                     return null;
229                 }
230                 return network;
231             }
232 
233             @Override
234             public String toString()
235             {
236                 return "Subscription handler for Nodes in network";
237             }
238 
239             @Override
240             public MetaData getAddressMetaData()
241             {
242                 return MetaData.EMPTY;
243             }
244         }, Network.NODE_ADD_EVENT, Network.NODE_REMOVE_EVENT, null, nodeSubscriptionHandler));
245         SubscriptionHandler linkGTUIdSubscriptionHandler = new SubscriptionHandler("GTUs on Link",
246                 new LinkGtuIdTransceiver(network), this.lookupLink, Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
247         addSubscriptionHandler(linkGTUIdSubscriptionHandler);
248         addSubscriptionHandler(new SubscriptionHandler("Cross section elements on Link",
249                 new CrossSectionElementTransceiver(network), this.lookupLink, CrossSectionLink.LANE_ADD_EVENT,
250                 CrossSectionLink.LANE_REMOVE_EVENT, null, linkGTUIdSubscriptionHandler));
251         // addTransceiver(new LaneGtuIdTransceiver(network));
252         SimulatorStateTransceiver stt = new SimulatorStateTransceiver(network.getSimulator());
253         SubscriptionHandler simulatorStateSubscriptionHandler = new SubscriptionHandler("Simulator running", stt,
254                 stt.getLookupEventProducer(), null, null, SimulatorStateTransceiver.SIMULATOR_STATE_CHANGED, null);
255         addSubscriptionHandler(simulatorStateSubscriptionHandler);
256 
257         if (additionalSubscriptionHandlers != null)
258         {
259             for (SubscriptionHandler sh : additionalSubscriptionHandlers)
260             {
261                 addSubscriptionHandler(sh);
262             }
263         }
264 
265         addSubscriptionHandler(new SubscriptionHandler("", this, null, null, null, null, null)); // The meta transceiver
266     }
267 
268     /** Lookup a CrossSectionLink in the network. */
269     private LookupEventProducer lookupLink = new LookupEventProducer()
270     {
271         @Override
272         public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
273                 throws IndexOutOfBoundsException, Sim0MQException, SerializationException
274         {
275             Throw.whenNull(address, "LookupLink requires the name of a link");
276             Throw.when(address.length != 1 || !(address[1] instanceof String), IllegalArgumentException.class, "Bad address");
277             Link link = Publisher.this.network.getLink((String) address[0]);
278             if (null == link)
279             {
280                 returnWrapper.nack("Network does not contain a Link with id " + address[0]);
281                 return null;
282             }
283             if (!(link instanceof EventProducer))
284             {
285                 returnWrapper.nack("Link \"" + address[0] + "\" is not able to handle subscriptions");
286                 return null;
287             }
288             return (CrossSectionLink) link;
289         }
290 
291         @Override
292         public String toString()
293         {
294             return "LookupProducerInterface that looks up a Link in the network";
295         }
296 
297         @Override
298         public MetaData getAddressMetaData()
299         {
300             return new MetaData("Link id", "Name of a link in the network",
301                     new ObjectDescriptor[] {new ObjectDescriptor("Link id", "Name of a link in the network", String.class)});
302         }
303     };
304 
305     /**
306      * Add a SubscriptionHandler to the map.
307      * @param subscriptionHandler SubscriptionHandler; the subscription handler to add to the map
308      */
309     private void addSubscriptionHandler(final SubscriptionHandler subscriptionHandler)
310     {
311         this.subscriptionHandlerMap.put(subscriptionHandler.getId(), subscriptionHandler);
312     }
313 
314     /** {@inheritDoc} */
315     @Override
316     public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
317             throws Sim0MQException, SerializationException
318     {
319         Throw.whenNull(returnWrapper, "returnWrapper may not be null");
320         String bad = verifyMetaData(getAddressFields(), address);
321         if (bad != null)
322         {
323             returnWrapper.nack("Bad address (should be the name of a transceiver): " + bad);
324             return null;
325         }
326         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(address[0]);
327         if (null == subscriptionHandler)
328         {
329             returnWrapper.nack("No transceiver with name \"" + address[0] + "\"");
330             return null;
331         }
332         return new Object[] {subscriptionHandler};
333     }
334 
335     /** Returned by the getIdSource method. */
336     private final TransceiverInterface idSource = new TransceiverInterface()
337     {
338         @Override
339         public String getId()
340         {
341             return "Transceiver for names of available transceivers in Publisher";
342         }
343 
344         @Override
345         public MetaData getAddressFields()
346         {
347             return MetaData.EMPTY;
348         }
349 
350         /** Result of getResultFields. */
351         private MetaData resultMetaData =
352                 new MetaData("Transceiver names available in Publisher", "String array", new ObjectDescriptor[] {
353                         new ObjectDescriptor("Transceiver names available in Publisher", "String array", String[].class)});
354 
355         @Override
356         public MetaData getResultFields()
357         {
358             return this.resultMetaData;
359         }
360 
361         @Override
362         public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
363                 throws RemoteException, Sim0MQException, SerializationException
364         {
365             Object[] result = new Object[Publisher.this.subscriptionHandlerMap.size()];
366             int index = 0;
367             for (String key : Publisher.this.subscriptionHandlerMap.keySet())
368             {
369                 result[index++] = key;
370             }
371             return result;
372         }
373     };
374 
375     /** {@inheritDoc} */
376     @Override
377     public TransceiverInterface getIdSource(final int addressLevel, final ReturnWrapper returnWrapper)
378             throws Sim0MQException, SerializationException
379     {
380         if (0 != addressLevel)
381         {
382             returnWrapper.encodeReplyAndTransmit("Address should be 0");
383             return null;
384         }
385         return this.idSource;
386     }
387 
388     /** {@inheritDoc} */
389     @Override
390     public boolean hasIdSource()
391     {
392         return true;
393     }
394 
395     /**
396      * Execute one command.
397      * @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
398      * @param command SubscriptionHandler.Command; the operation to perform
399      * @param address Object[]; the address on which to perform the operation
400      * @param returnWrapper ReturnWrapper; to transmit the result
401      * @throws RemoteException on RMI network failure
402      * @throws SerializationException on illegal type in serialization
403      * @throws Sim0MQException on communication error
404      */
405     public void executeCommand(final String subscriptionHandlerName, final SubscriptionHandler.Command command,
406             final Object[] address, final ReturnWrapper returnWrapper)
407             throws RemoteException, Sim0MQException, SerializationException
408     {
409         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(subscriptionHandlerName);
410         if (null == subscriptionHandler)
411         {
412             returnWrapper.nack("No subscription handler for \"" + subscriptionHandlerName + "\"");
413             return;
414         }
415         subscriptionHandler.executeCommand(command, address, returnWrapper);
416     }
417 
418     /**
419      * Execute one command.
420      * @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
421      * @param commandString String; the operation to perform
422      * @param address Object[]; the address on which to perform the operation
423      * @param returnWrapper ReturnWrapper; to transmit the result
424      * @throws RemoteException on RMI network failure
425      * @throws SerializationException on illegal type in serialization
426      * @throws Sim0MQException on communication error
427      */
428     public void executeCommand(final String subscriptionHandlerName, final String commandString, final Object[] address,
429             final ReturnWrapperImpl returnWrapper) throws RemoteException, Sim0MQException, SerializationException
430     {
431         executeCommand(subscriptionHandlerName,
432                 Throw.whenNull(SubscriptionHandler.lookupCommand(commandString), "Invalid command (%s)", commandString),
433                 address, returnWrapper);
434     }
435 
436     /**
437      * Find the IncomingDataHandler for a particular key.
438      * @param key String; the key of the IncomingDataHandler
439      * @return IncomingDataHandler; or null if there is no IncomingDataHandler for the key
440      */
441     public IncomingDataHandler lookupIncomingDataHandler(final String key)
442     {
443         return this.incomingDataHandlers.get(key);
444     }
445 
446 }