View Javadoc
1   package org.opentrafficsim.sim0mq.publisher;
2   
3   import java.rmi.RemoteException;
4   import java.util.LinkedHashMap;
5   import java.util.List;
6   import java.util.Map;
7   
8   import org.djutils.exceptions.Throw;
9   import org.djutils.event.EventProducer;
10  import org.djutils.metadata.MetaData;
11  import org.djutils.metadata.ObjectDescriptor;
12  import org.djutils.serialization.SerializationException;
13  import org.opentrafficsim.core.gtu.Gtu;
14  import org.opentrafficsim.core.network.Link;
15  import org.opentrafficsim.core.network.Network;
16  import org.opentrafficsim.road.network.lane.CrossSectionLink;
17  import org.sim0mq.Sim0MQException;
18  
19  /**
20   * Publish all available transceivers for an OTS network to a Sim0MQ master and handle its requests. <br>
21   * Example sequence of events: <br>
22   * <ol>
23   * <li>Network is somehow constructed and then a Publisher for that network is constructed.</li>
24   * <li>Sim0MQ master requests names of all available subscription handlers</li>
25   * <li>Sim0MQ master decides that it wants all GTU MOVE events of all GTUs. To do that it needs to know about all GTUs when they
26   * are created and about all GTUs that have already been created. The Sim0MQ master issues to the publisher a request to
27   * subscribe to all Network.GTU_ADD_EVENTs of the GTUs_in_network SubscriptionHandler</li>
28   * <li>This Publisher requests the GTUs_in_network SubscriptionHandler to subscribe to the add events. From now on, the
29   * GTUs_in_network SubscriptionHandler will receive these events generated by the Network and transcribe those into a Sim0MQ
30   * events which are transmitted to the Sim0MQ master.</li>
31   * <li>Sim0MQ master requests publisher to list all the elements of the GTUs_in_network SubscriptionHandler</li>
32   * <li>This Publisher calls the list method of the GTUs_in_network SubscriptionHandler which results in a list of all active
33   * GTUs being sent to the Sim0MQ master</li>
34   * <li>The Sim0MQ master requests this Publisher to create a subscription for the update events of the GTU_move
35   * SubscriptionHandler, providing the GTU id as address. It does that once for every GTU id.</li>
36   * <li>This Publishers creates the subscriptions. From now on any GTU.MOVE_EVENT event is transcribed by the GTU_move
37   * SubscriptionHandler in to a corresponding Sim0MQ event and sent to the Sim0MQ master.</li>
38   * </ol>
39   * <p>
40   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
41   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
42   * </p>
43   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
44   * @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
45   */
46  public class Publisher extends AbstractTransceiver
47  {
48      /** Map Publisher names to the corresponding Publisher object. */
49      @SuppressWarnings("checkstyle:visibilitymodifier")
50      final Map<String, SubscriptionHandler> subscriptionHandlerMap = new LinkedHashMap<>();
51  
52      /** Handlers for data not handled directly by this Publisher. */
53      private final Map<String, IncomingDataHandler> incomingDataHandlers = new LinkedHashMap<>();
54  
55      /** The OTS network. */
56      @SuppressWarnings("checkstyle:visibilitymodifier")
57      final Network network;
58  
59      /**
60       * Construct a Publisher for an OTS network with no additional subscription handlers.
61       * @param network Network; the OTS network
62       * @throws RemoteException ...
63       */
64      public Publisher(final Network network) throws RemoteException
65      {
66          this(network, null, null);
67      }
68  
69      /**
70       * Construct a Publisher for an OTS network.
71       * @param network Network; the OTS network
72       * @param additionalSubscriptionHandlers List&lt;SubscriptionHandler&gt;; list of additional subscription handlers to
73       *            register (may be null)
74       * @param incomingDataHandlers List&lt;IncomingDataHandler&gt;; handlers for data not handled directly by the Publisher (may
75       *            be null).
76       * @throws RemoteException ...
77       */
78      public Publisher(final Network network, final List<SubscriptionHandler> additionalSubscriptionHandlers,
79              final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
80      {
81          super("Publisher for " + Throw.whenNull(network, "Network may not be null").getId(),
82                  new MetaData("Publisher for " + network.getId(), "Publisher",
83                          new ObjectDescriptor[] {new ObjectDescriptor("Name of subscription handler", "String", String.class)}),
84                  new MetaData("Subscription handlers", "Subscription handlers",
85                          new ObjectDescriptor[] {new ObjectDescriptor("Name of subscription handler", "String", String.class)}));
86          this.network = network;
87  
88          if (incomingDataHandlers != null)
89          {
90              for (IncomingDataHandler incomingDataHandler : incomingDataHandlers)
91              {
92                  this.incomingDataHandlers.put(incomingDataHandler.getKey(), incomingDataHandler);
93              }
94          }
95  
96          GtuIdTransceiver gtuIdTransceiver = new GtuIdTransceiver(network);
97          GtuTransceiver gtuTransceiver = new GtuTransceiver(network, gtuIdTransceiver);
98          SubscriptionHandler gtuSubscriptionHandler =
99                  new SubscriptionHandler("GTU move", gtuTransceiver, new LookupEventProducer()
100                 {
101                     @Override
102                     public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
103                             throws Sim0MQException, SerializationException
104                     {
105                         String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
106                         if (bad != null)
107                         {
108                             returnWrapper.nack(bad);
109                             return null;
110                         }
111                         EventProducer result = network.getGTU((String) address[0]);
112                         if (null == result)
113                         {
114                             returnWrapper.nack("No GTU with id \"" + address[0] + "\" found");
115                         }
116                         return result;
117                     }
118 
119                     private final MetaData metaData = new MetaData("GTU Id", "GTU Id",
120                             new ObjectDescriptor[] {new ObjectDescriptor("GTU ID", "GTU Id", String.class)});
121 
122                     @Override
123                     public MetaData getAddressMetaData()
124                     {
125                         return this.metaData;
126                     }
127                 }, null, null, Gtu.MOVE_EVENT, null);
128         addSubscriptionHandler(gtuSubscriptionHandler);
129         addSubscriptionHandler(new SubscriptionHandler("GTUs in network", gtuIdTransceiver, new LookupEventProducer()
130         {
131             @Override
132             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
133                     throws Sim0MQException, SerializationException
134             {
135                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
136                 if (bad != null)
137                 {
138                     returnWrapper.nack(bad);
139                     return null;
140                 }
141                 return network;
142             }
143 
144             @Override
145             public String toString()
146             {
147                 return "Subscription handler for GTUs in network";
148             }
149 
150             @Override
151             public MetaData getAddressMetaData()
152             {
153                 return MetaData.EMPTY;
154             }
155         }, Network.GTU_ADD_EVENT, Network.GTU_REMOVE_EVENT, null, gtuSubscriptionHandler));
156         LinkIdTransceiver linkIdTransceiver = new LinkIdTransceiver(network);
157         LinkTransceiver linkTransceiver = new LinkTransceiver(network, linkIdTransceiver);
158         SubscriptionHandler linkSubscriptionHandler = new SubscriptionHandler("Link change", linkTransceiver, this.lookupLink,
159                 Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
160         addSubscriptionHandler(linkSubscriptionHandler);
161         addSubscriptionHandler(new SubscriptionHandler("Links in network", linkIdTransceiver, new LookupEventProducer()
162         {
163             @Override
164             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
165                     throws Sim0MQException, SerializationException
166             {
167                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
168                 if (bad != null)
169                 {
170                     returnWrapper.nack(bad);
171                     return null;
172                 }
173                 return network;
174             }
175 
176             @Override
177             public String toString()
178             {
179                 return "Subscription handler for Links in network";
180             }
181 
182             @Override
183             public MetaData getAddressMetaData()
184             {
185                 return MetaData.EMPTY;
186             }
187         }, Network.LINK_ADD_EVENT, Network.LINK_REMOVE_EVENT, null, linkSubscriptionHandler));
188         NodeIdTransceiver nodeIdTransceiver = new NodeIdTransceiver(network);
189         NodeTransceiver nodeTransceiver = new NodeTransceiver(network, nodeIdTransceiver);
190         // addTransceiver(nodeIdTransceiver);
191         // addTransceiver(new NodeTransceiver(network, nodeIdTransceiver));
192         SubscriptionHandler nodeSubscriptionHandler =
193                 new SubscriptionHandler("Node change", nodeTransceiver, new LookupEventProducer()
194                 {
195                     @Override
196                     public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
197                     {
198                         return null; // Nodes do not emit events
199                     }
200 
201                     @Override
202                     public String toString()
203                     {
204                         return "Subscription handler for Node change";
205                     }
206 
207                     private final MetaData metaData = new MetaData("Node Id", "Node Id",
208                             new ObjectDescriptor[] {new ObjectDescriptor("Node ID", "Node Id", String.class)});
209 
210                     @Override
211                     public MetaData getAddressMetaData()
212                     {
213                         return this.metaData;
214                     }
215                 }, null, null, null, null);
216         addSubscriptionHandler(nodeSubscriptionHandler);
217         addSubscriptionHandler(new SubscriptionHandler("Nodes in network", nodeIdTransceiver, new LookupEventProducer()
218         {
219             @Override
220             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
221                     throws Sim0MQException, SerializationException
222             {
223                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
224                 if (bad != null)
225                 {
226                     returnWrapper.nack(bad);
227                     return null;
228                 }
229                 return network;
230             }
231 
232             @Override
233             public String toString()
234             {
235                 return "Subscription handler for Nodes in network";
236             }
237 
238             @Override
239             public MetaData getAddressMetaData()
240             {
241                 return MetaData.EMPTY;
242             }
243         }, Network.NODE_ADD_EVENT, Network.NODE_REMOVE_EVENT, null, nodeSubscriptionHandler));
244         SubscriptionHandler linkGTUIdSubscriptionHandler = new SubscriptionHandler("GTUs on Link",
245                 new LinkGtuIdTransceiver(network), this.lookupLink, Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
246         addSubscriptionHandler(linkGTUIdSubscriptionHandler);
247         addSubscriptionHandler(new SubscriptionHandler("Cross section elements on Link",
248                 new CrossSectionElementTransceiver(network), this.lookupLink, CrossSectionLink.LANE_ADD_EVENT,
249                 CrossSectionLink.LANE_REMOVE_EVENT, null, linkGTUIdSubscriptionHandler));
250         // addTransceiver(new LaneGtuIdTransceiver(network));
251         SimulatorStateTransceiver stt = new SimulatorStateTransceiver(network.getSimulator());
252         SubscriptionHandler simulatorStateSubscriptionHandler = new SubscriptionHandler("Simulator running", stt,
253                 stt.getLookupEventProducer(), null, null, SimulatorStateTransceiver.SIMULATOR_STATE_CHANGED, null);
254         addSubscriptionHandler(simulatorStateSubscriptionHandler);
255 
256         if (additionalSubscriptionHandlers != null)
257         {
258             for (SubscriptionHandler sh : additionalSubscriptionHandlers)
259             {
260                 addSubscriptionHandler(sh);
261             }
262         }
263 
264         addSubscriptionHandler(new SubscriptionHandler("", this, null, null, null, null, null)); // The meta transceiver
265     }
266 
267     /** Lookup a CrossSectionLink in the network. */
268     private LookupEventProducer lookupLink = new LookupEventProducer()
269     {
270         @Override
271         public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
272                 throws IndexOutOfBoundsException, Sim0MQException, SerializationException
273         {
274             Throw.whenNull(address, "LookupLink requires the name of a link");
275             Throw.when(address.length != 1 || !(address[1] instanceof String), IllegalArgumentException.class, "Bad address");
276             Link link = Publisher.this.network.getLink((String) address[0]);
277             if (null == link)
278             {
279                 returnWrapper.nack("Network does not contain a Link with id " + address[0]);
280                 return null;
281             }
282             if (!(link instanceof EventProducer))
283             {
284                 returnWrapper.nack("Link \"" + address[0] + "\" is not able to handle subscriptions");
285                 return null;
286             }
287             return (CrossSectionLink) link;
288         }
289 
290         @Override
291         public String toString()
292         {
293             return "LookupProducerInterface that looks up a Link in the network";
294         }
295 
296         @Override
297         public MetaData getAddressMetaData()
298         {
299             return new MetaData("Link id", "Name of a link in the network",
300                     new ObjectDescriptor[] {new ObjectDescriptor("Link id", "Name of a link in the network", String.class)});
301         }
302     };
303 
304     /**
305      * Add a SubscriptionHandler to the map.
306      * @param subscriptionHandler SubscriptionHandler; the subscription handler to add to the map
307      */
308     private void addSubscriptionHandler(final SubscriptionHandler subscriptionHandler)
309     {
310         this.subscriptionHandlerMap.put(subscriptionHandler.getId(), subscriptionHandler);
311     }
312 
313     /** {@inheritDoc} */
314     @Override
315     public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
316             throws Sim0MQException, SerializationException
317     {
318         Throw.whenNull(returnWrapper, "returnWrapper may not be null");
319         String bad = verifyMetaData(getAddressFields(), address);
320         if (bad != null)
321         {
322             returnWrapper.nack("Bad address (should be the name of a transceiver): " + bad);
323             return null;
324         }
325         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(address[0]);
326         if (null == subscriptionHandler)
327         {
328             returnWrapper.nack("No transceiver with name \"" + address[0] + "\"");
329             return null;
330         }
331         return new Object[] {subscriptionHandler};
332     }
333 
334     /** Returned by the getIdSource method. */
335     private final TransceiverInterface idSource = new TransceiverInterface()
336     {
337         @Override
338         public String getId()
339         {
340             return "Transceiver for names of available transceivers in Publisher";
341         }
342 
343         @Override
344         public MetaData getAddressFields()
345         {
346             return MetaData.EMPTY;
347         }
348 
349         /** Result of getResultFields. */
350         private MetaData resultMetaData =
351                 new MetaData("Transceiver names available in Publisher", "String array", new ObjectDescriptor[] {
352                         new ObjectDescriptor("Transceiver names available in Publisher", "String array", String[].class)});
353 
354         @Override
355         public MetaData getResultFields()
356         {
357             return this.resultMetaData;
358         }
359 
360         @Override
361         public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
362                 throws RemoteException, Sim0MQException, SerializationException
363         {
364             Object[] result = new Object[Publisher.this.subscriptionHandlerMap.size()];
365             int index = 0;
366             for (String key : Publisher.this.subscriptionHandlerMap.keySet())
367             {
368                 result[index++] = key;
369             }
370             return result;
371         }
372     };
373 
374     /** {@inheritDoc} */
375     @Override
376     public TransceiverInterface getIdSource(final int addressLevel, final ReturnWrapper returnWrapper)
377             throws Sim0MQException, SerializationException
378     {
379         if (0 != addressLevel)
380         {
381             returnWrapper.encodeReplyAndTransmit("Address should be 0");
382             return null;
383         }
384         return this.idSource;
385     }
386 
387     /** {@inheritDoc} */
388     @Override
389     public boolean hasIdSource()
390     {
391         return true;
392     }
393 
394     /**
395      * Execute one command.
396      * @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
397      * @param command SubscriptionHandler.Command; the operation to perform
398      * @param address Object[]; the address on which to perform the operation
399      * @param returnWrapper ReturnWrapper; to transmit the result
400      * @throws RemoteException on RMI network failure
401      * @throws SerializationException on illegal type in serialization
402      * @throws Sim0MQException on communication error
403      */
404     public void executeCommand(final String subscriptionHandlerName, final SubscriptionHandler.Command command,
405             final Object[] address, final ReturnWrapper returnWrapper)
406             throws RemoteException, Sim0MQException, SerializationException
407     {
408         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(subscriptionHandlerName);
409         if (null == subscriptionHandler)
410         {
411             returnWrapper.nack("No subscription handler for \"" + subscriptionHandlerName + "\"");
412             return;
413         }
414         subscriptionHandler.executeCommand(command, address, returnWrapper);
415     }
416 
417     /**
418      * Execute one command.
419      * @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
420      * @param commandString String; the operation to perform
421      * @param address Object[]; the address on which to perform the operation
422      * @param returnWrapper ReturnWrapper; to transmit the result
423      * @throws RemoteException on RMI network failure
424      * @throws SerializationException on illegal type in serialization
425      * @throws Sim0MQException on communication error
426      */
427     public void executeCommand(final String subscriptionHandlerName, final String commandString, final Object[] address,
428             final ReturnWrapperImpl returnWrapper) throws RemoteException, Sim0MQException, SerializationException
429     {
430         executeCommand(subscriptionHandlerName,
431                 Throw.whenNull(SubscriptionHandler.lookupCommand(commandString), "Invalid command (%s)", commandString),
432                 address, returnWrapper);
433     }
434 
435     /**
436      * Find the IncomingDataHandler for a particular key.
437      * @param key String; the key of the IncomingDataHandler
438      * @return IncomingDataHandler; or null if there is no IncomingDataHandler for the key
439      */
440     public IncomingDataHandler lookupIncomingDataHandler(final String key)
441     {
442         return this.incomingDataHandlers.get(key);
443     }
444 
445 }