View Javadoc
1   package org.opentrafficsim.sim0mq.publisher;
2   
3   import java.rmi.RemoteException;
4   import java.util.LinkedHashMap;
5   import java.util.List;
6   import java.util.Map;
7   
8   import org.djutils.exceptions.Throw;
9   import org.djutils.event.EventProducer;
10  import org.djutils.metadata.MetaData;
11  import org.djutils.metadata.ObjectDescriptor;
12  import org.djutils.serialization.SerializationException;
13  import org.opentrafficsim.core.gtu.Gtu;
14  import org.opentrafficsim.core.network.Link;
15  import org.opentrafficsim.core.network.Network;
16  import org.opentrafficsim.road.network.lane.CrossSectionLink;
17  import org.sim0mq.Sim0MQException;
18  
19  /**
20   * Publish all available transceivers for an OTS network to a Sim0MQ master and handle its requests. <br>
21   * Example sequence of events: <br>
22   * <ol>
23   * <li>Network is somehow constructed and then a Publisher for that network is constructed.</li>
24   * <li>Sim0MQ master requests names of all available subscription handlers</li>
25   * <li>Sim0MQ master decides that it wants all GTU MOVE events of all GTUs. To do that it needs to know about all GTUs when they
26   * are created and about all GTUs that have already been created. The Sim0MQ master issues to the publisher a request to
27   * subscribe to all Network.GTU_ADD_EVENTs of the GTUs_in_network SubscriptionHandler</li>
28   * <li>This Publisher requests the GTUs_in_network SubscriptionHandler to subscribe to the add events. From now on, the
29   * GTUs_in_network SubscriptionHandler will receive these events generated by the Network and transcribe those into a Sim0MQ
30   * events which are transmitted to the Sim0MQ master.</li>
31   * <li>Sim0MQ master requests publisher to list all the elements of the GTUs_in_network SubscriptionHandler</li>
32   * <li>This Publisher calls the list method of the GTUs_in_network SubscriptionHandler which results in a list of all active
33   * GTUs being sent to the Sim0MQ master</li>
34   * <li>The Sim0MQ master requests this Publisher to create a subscription for the update events of the GTU_move
35   * SubscriptionHandler, providing the GTU id as address. It does that once for every GTU id.</li>
36   * <li>This Publishers creates the subscriptions. From now on any GTU.MOVE_EVENT event is transcribed by the GTU_move
37   * SubscriptionHandler in to a corresponding Sim0MQ event and sent to the Sim0MQ master.</li>
38   * </ol>
39   * <p>
40   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
41   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
42   * </p>
43   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
44   * @author <a href="https://github.com/peter-knoppers">Peter Knoppers</a>
45   */
46  public class Publisher extends AbstractTransceiver
47  {
48      /** Map Publisher names to the corresponding Publisher object. */
49      @SuppressWarnings("checkstyle:visibilitymodifier")
50      final Map<String, SubscriptionHandler> subscriptionHandlerMap = new LinkedHashMap<>();
51  
52      /** Handlers for data not handled directly by this Publisher. */
53      private final Map<String, IncomingDataHandler> incomingDataHandlers = new LinkedHashMap<>();
54  
55      /** The OTS network. */
56      @SuppressWarnings("checkstyle:visibilitymodifier")
57      final Network network;
58  
59      /**
60       * Construct a Publisher for an OTS network with no additional subscription handlers.
61       * @param network the OTS network
62       * @throws RemoteException ...
63       */
64      public Publisher(final Network network) throws RemoteException
65      {
66          this(network, null, null);
67      }
68  
69      /**
70       * Construct a Publisher for an OTS network.
71       * @param network the OTS network
72       * @param additionalSubscriptionHandlers list of additional subscription handlers to register (may be null)
73       * @param incomingDataHandlers handlers for data not handled directly by the Publisher (may be null).
74       * @throws RemoteException ...
75       */
76      public Publisher(final Network network, final List<SubscriptionHandler> additionalSubscriptionHandlers,
77              final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
78      {
79          super("Publisher for " + Throw.whenNull(network, "Network may not be null").getId(),
80                  new MetaData("Publisher for " + network.getId(), "Publisher",
81                          new ObjectDescriptor[] {new ObjectDescriptor("Name of subscription handler", "String", String.class)}),
82                  new MetaData("Subscription handlers", "Subscription handlers",
83                          new ObjectDescriptor[] {new ObjectDescriptor("Name of subscription handler", "String", String.class)}));
84          this.network = network;
85  
86          if (incomingDataHandlers != null)
87          {
88              for (IncomingDataHandler incomingDataHandler : incomingDataHandlers)
89              {
90                  this.incomingDataHandlers.put(incomingDataHandler.getKey(), incomingDataHandler);
91              }
92          }
93  
94          GtuIdTransceiver gtuIdTransceiver = new GtuIdTransceiver(network);
95          GtuTransceiver gtuTransceiver = new GtuTransceiver(network, gtuIdTransceiver);
96          SubscriptionHandler gtuSubscriptionHandler =
97                  new SubscriptionHandler("GTU move", gtuTransceiver, new LookupEventProducer()
98                  {
99                      @Override
100                     public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
101                             throws Sim0MQException, SerializationException
102                     {
103                         String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
104                         if (bad != null)
105                         {
106                             returnWrapper.nack(bad);
107                             return null;
108                         }
109                         EventProducer result = network.getGTU((String) address[0]);
110                         if (null == result)
111                         {
112                             returnWrapper.nack("No GTU with id \"" + address[0] + "\" found");
113                         }
114                         return result;
115                     }
116 
117                     private final MetaData metaData = new MetaData("GTU Id", "GTU Id",
118                             new ObjectDescriptor[] {new ObjectDescriptor("GTU ID", "GTU Id", String.class)});
119 
120                     @Override
121                     public MetaData getAddressMetaData()
122                     {
123                         return this.metaData;
124                     }
125                 }, null, null, Gtu.MOVE_EVENT, null);
126         addSubscriptionHandler(gtuSubscriptionHandler);
127         addSubscriptionHandler(new SubscriptionHandler("GTUs in network", gtuIdTransceiver, new LookupEventProducer()
128         {
129             @Override
130             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
131                     throws Sim0MQException, SerializationException
132             {
133                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
134                 if (bad != null)
135                 {
136                     returnWrapper.nack(bad);
137                     return null;
138                 }
139                 return network;
140             }
141 
142             @Override
143             public String toString()
144             {
145                 return "Subscription handler for GTUs in network";
146             }
147 
148             @Override
149             public MetaData getAddressMetaData()
150             {
151                 return MetaData.EMPTY;
152             }
153         }, Network.GTU_ADD_EVENT, Network.GTU_REMOVE_EVENT, null, gtuSubscriptionHandler));
154         LinkIdTransceiver linkIdTransceiver = new LinkIdTransceiver(network);
155         LinkTransceiver linkTransceiver = new LinkTransceiver(network, linkIdTransceiver);
156         SubscriptionHandler linkSubscriptionHandler = new SubscriptionHandler("Link change", linkTransceiver, this.lookupLink,
157                 Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
158         addSubscriptionHandler(linkSubscriptionHandler);
159         addSubscriptionHandler(new SubscriptionHandler("Links in network", linkIdTransceiver, new LookupEventProducer()
160         {
161             @Override
162             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
163                     throws Sim0MQException, SerializationException
164             {
165                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
166                 if (bad != null)
167                 {
168                     returnWrapper.nack(bad);
169                     return null;
170                 }
171                 return network;
172             }
173 
174             @Override
175             public String toString()
176             {
177                 return "Subscription handler for Links in network";
178             }
179 
180             @Override
181             public MetaData getAddressMetaData()
182             {
183                 return MetaData.EMPTY;
184             }
185         }, Network.LINK_ADD_EVENT, Network.LINK_REMOVE_EVENT, null, linkSubscriptionHandler));
186         NodeIdTransceiver nodeIdTransceiver = new NodeIdTransceiver(network);
187         NodeTransceiver nodeTransceiver = new NodeTransceiver(network, nodeIdTransceiver);
188         // addTransceiver(nodeIdTransceiver);
189         // addTransceiver(new NodeTransceiver(network, nodeIdTransceiver));
190         SubscriptionHandler nodeSubscriptionHandler =
191                 new SubscriptionHandler("Node change", nodeTransceiver, new LookupEventProducer()
192                 {
193                     @Override
194                     public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
195                     {
196                         return null; // Nodes do not emit events
197                     }
198 
199                     @Override
200                     public String toString()
201                     {
202                         return "Subscription handler for Node change";
203                     }
204 
205                     private final MetaData metaData = new MetaData("Node Id", "Node Id",
206                             new ObjectDescriptor[] {new ObjectDescriptor("Node ID", "Node Id", String.class)});
207 
208                     @Override
209                     public MetaData getAddressMetaData()
210                     {
211                         return this.metaData;
212                     }
213                 }, null, null, null, null);
214         addSubscriptionHandler(nodeSubscriptionHandler);
215         addSubscriptionHandler(new SubscriptionHandler("Nodes in network", nodeIdTransceiver, new LookupEventProducer()
216         {
217             @Override
218             public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
219                     throws Sim0MQException, SerializationException
220             {
221                 String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
222                 if (bad != null)
223                 {
224                     returnWrapper.nack(bad);
225                     return null;
226                 }
227                 return network;
228             }
229 
230             @Override
231             public String toString()
232             {
233                 return "Subscription handler for Nodes in network";
234             }
235 
236             @Override
237             public MetaData getAddressMetaData()
238             {
239                 return MetaData.EMPTY;
240             }
241         }, Network.NODE_ADD_EVENT, Network.NODE_REMOVE_EVENT, null, nodeSubscriptionHandler));
242         SubscriptionHandler linkGTUIdSubscriptionHandler = new SubscriptionHandler("GTUs on Link",
243                 new LinkGtuIdTransceiver(network), this.lookupLink, Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
244         addSubscriptionHandler(linkGTUIdSubscriptionHandler);
245         addSubscriptionHandler(new SubscriptionHandler("Cross section elements on Link",
246                 new CrossSectionElementTransceiver(network), this.lookupLink, CrossSectionLink.LANE_ADD_EVENT,
247                 CrossSectionLink.LANE_REMOVE_EVENT, null, linkGTUIdSubscriptionHandler));
248         // addTransceiver(new LaneGtuIdTransceiver(network));
249         SimulatorStateTransceiver stt = new SimulatorStateTransceiver(network.getSimulator());
250         SubscriptionHandler simulatorStateSubscriptionHandler = new SubscriptionHandler("Simulator running", stt,
251                 stt.getLookupEventProducer(), null, null, SimulatorStateTransceiver.SIMULATOR_STATE_CHANGED, null);
252         addSubscriptionHandler(simulatorStateSubscriptionHandler);
253 
254         if (additionalSubscriptionHandlers != null)
255         {
256             for (SubscriptionHandler sh : additionalSubscriptionHandlers)
257             {
258                 addSubscriptionHandler(sh);
259             }
260         }
261 
262         addSubscriptionHandler(new SubscriptionHandler("", this, null, null, null, null, null)); // The meta transceiver
263     }
264 
265     /** Lookup a CrossSectionLink in the network. */
266     private LookupEventProducer lookupLink = new LookupEventProducer()
267     {
268         @Override
269         public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
270                 throws IndexOutOfBoundsException, Sim0MQException, SerializationException
271         {
272             Throw.whenNull(address, "LookupLink requires the name of a link");
273             Throw.when(address.length != 1 || !(address[1] instanceof String), IllegalArgumentException.class, "Bad address");
274             Link link = Publisher.this.network.getLink((String) address[0]);
275             if (null == link)
276             {
277                 returnWrapper.nack("Network does not contain a Link with id " + address[0]);
278                 return null;
279             }
280             if (!(link instanceof EventProducer))
281             {
282                 returnWrapper.nack("Link \"" + address[0] + "\" is not able to handle subscriptions");
283                 return null;
284             }
285             return (CrossSectionLink) link;
286         }
287 
288         @Override
289         public String toString()
290         {
291             return "LookupProducerInterface that looks up a Link in the network";
292         }
293 
294         @Override
295         public MetaData getAddressMetaData()
296         {
297             return new MetaData("Link id", "Name of a link in the network",
298                     new ObjectDescriptor[] {new ObjectDescriptor("Link id", "Name of a link in the network", String.class)});
299         }
300     };
301 
302     /**
303      * Add a SubscriptionHandler to the map.
304      * @param subscriptionHandler the subscription handler to add to the map
305      */
306     private void addSubscriptionHandler(final SubscriptionHandler subscriptionHandler)
307     {
308         this.subscriptionHandlerMap.put(subscriptionHandler.getId(), subscriptionHandler);
309     }
310 
311     @Override
312     public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
313             throws Sim0MQException, SerializationException
314     {
315         Throw.whenNull(returnWrapper, "returnWrapper may not be null");
316         String bad = verifyMetaData(getAddressFields(), address);
317         if (bad != null)
318         {
319             returnWrapper.nack("Bad address (should be the name of a transceiver): " + bad);
320             return null;
321         }
322         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(address[0]);
323         if (null == subscriptionHandler)
324         {
325             returnWrapper.nack("No transceiver with name \"" + address[0] + "\"");
326             return null;
327         }
328         return new Object[] {subscriptionHandler};
329     }
330 
331     /** Returned by the getIdSource method. */
332     private final TransceiverInterface idSource = new TransceiverInterface()
333     {
334         @Override
335         public String getId()
336         {
337             return "Transceiver for names of available transceivers in Publisher";
338         }
339 
340         @Override
341         public MetaData getAddressFields()
342         {
343             return MetaData.EMPTY;
344         }
345 
346         /** Result of getResultFields. */
347         private MetaData resultMetaData =
348                 new MetaData("Transceiver names available in Publisher", "String array", new ObjectDescriptor[] {
349                         new ObjectDescriptor("Transceiver names available in Publisher", "String array", String[].class)});
350 
351         @Override
352         public MetaData getResultFields()
353         {
354             return this.resultMetaData;
355         }
356 
357         @Override
358         public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
359                 throws RemoteException, Sim0MQException, SerializationException
360         {
361             Object[] result = new Object[Publisher.this.subscriptionHandlerMap.size()];
362             int index = 0;
363             for (String key : Publisher.this.subscriptionHandlerMap.keySet())
364             {
365                 result[index++] = key;
366             }
367             return result;
368         }
369     };
370 
371     @Override
372     public TransceiverInterface getIdSource(final int addressLevel, final ReturnWrapper returnWrapper)
373             throws Sim0MQException, SerializationException
374     {
375         if (0 != addressLevel)
376         {
377             returnWrapper.encodeReplyAndTransmit("Address should be 0");
378             return null;
379         }
380         return this.idSource;
381     }
382 
383     @Override
384     public boolean hasIdSource()
385     {
386         return true;
387     }
388 
389     /**
390      * Execute one command.
391      * @param subscriptionHandlerName name of the SubscriptionHandler for which the command is destined
392      * @param command SubscriptionHandler.Command; the operation to perform
393      * @param address the address on which to perform the operation
394      * @param returnWrapper to transmit the result
395      * @throws RemoteException on RMI network failure
396      * @throws SerializationException on illegal type in serialization
397      * @throws Sim0MQException on communication error
398      */
399     public void executeCommand(final String subscriptionHandlerName, final SubscriptionHandler.Command command,
400             final Object[] address, final ReturnWrapper returnWrapper)
401             throws RemoteException, Sim0MQException, SerializationException
402     {
403         SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(subscriptionHandlerName);
404         if (null == subscriptionHandler)
405         {
406             returnWrapper.nack("No subscription handler for \"" + subscriptionHandlerName + "\"");
407             return;
408         }
409         subscriptionHandler.executeCommand(command, address, returnWrapper);
410     }
411 
412     /**
413      * Execute one command.
414      * @param subscriptionHandlerName name of the SubscriptionHandler for which the command is destined
415      * @param commandString the operation to perform
416      * @param address the address on which to perform the operation
417      * @param returnWrapper to transmit the result
418      * @throws RemoteException on RMI network failure
419      * @throws SerializationException on illegal type in serialization
420      * @throws Sim0MQException on communication error
421      */
422     public void executeCommand(final String subscriptionHandlerName, final String commandString, final Object[] address,
423             final ReturnWrapperImpl returnWrapper) throws RemoteException, Sim0MQException, SerializationException
424     {
425         executeCommand(subscriptionHandlerName,
426                 Throw.whenNull(SubscriptionHandler.lookupCommand(commandString), "Invalid command (%s)", commandString),
427                 address, returnWrapper);
428     }
429 
430     /**
431      * Find the IncomingDataHandler for a particular key.
432      * @param key the key of the IncomingDataHandler
433      * @return or null if there is no IncomingDataHandler for the key
434      */
435     public IncomingDataHandler lookupIncomingDataHandler(final String key)
436     {
437         return this.incomingDataHandlers.get(key);
438     }
439 
440 }