Publisher.java
package org.opentrafficsim.sim0mq.publisher;
import java.rmi.RemoteException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.djutils.exceptions.Throw;
import org.djutils.event.EventProducer;
import org.djutils.metadata.MetaData;
import org.djutils.metadata.ObjectDescriptor;
import org.djutils.serialization.SerializationException;
import org.opentrafficsim.core.gtu.Gtu;
import org.opentrafficsim.core.network.Link;
import org.opentrafficsim.core.network.Network;
import org.opentrafficsim.road.network.lane.CrossSectionLink;
import org.sim0mq.Sim0MQException;
/**
* Publish all available transceivers for an OTS network to a Sim0MQ master and handle its requests. <br>
* Example sequence of events: <br>
* <ol>
* <li>Network is somehow constructed and then a Publisher for that network is constructed.</li>
* <li>Sim0MQ master requests names of all available subscription handlers</li>
* <li>Sim0MQ master decides that it wants all GTU MOVE events of all GTUs. To do that it needs to know about all GTUs when they
* are created and about all GTUs that have already been created. The Sim0MQ master issues to the publisher a request to
* subscribe to all Network.GTU_ADD_EVENTs of the GTUs_in_network SubscriptionHandler</li>
* <li>This Publisher requests the GTUs_in_network SubscriptionHandler to subscribe to the add events. From now on, the
* GTUs_in_network SubscriptionHandler will receive these events generated by the Network and transcribe those into a Sim0MQ
* events which are transmitted to the Sim0MQ master.</li>
* <li>Sim0MQ master requests publisher to list all the elements of the GTUs_in_network SubscriptionHandler</li>
* <li>This Publisher calls the list method of the GTUs_in_network SubscriptionHandler which results in a list of all active
* GTUs being sent to the Sim0MQ master</li>
* <li>The Sim0MQ master requests this Publisher to create a subscription for the update events of the GTU_move
* SubscriptionHandler, providing the GTU id as address. It does that once for every GTU id.</li>
* <li>This Publishers creates the subscriptions. From now on any GTU.MOVE_EVENT event is transcribed by the GTU_move
* SubscriptionHandler in to a corresponding Sim0MQ event and sent to the Sim0MQ master.</li>
* </ol>
* <p>
* Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
* BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
* </p>
* @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
* @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
*/
public class Publisher extends AbstractTransceiver
{
/** Map Publisher names to the corresponding Publisher object. */
@SuppressWarnings("checkstyle:visibilitymodifier")
final Map<String, SubscriptionHandler> subscriptionHandlerMap = new LinkedHashMap<>();
/** Handlers for data not handled directly by this Publisher. */
private final Map<String, IncomingDataHandler> incomingDataHandlers = new LinkedHashMap<>();
/** The OTS network. */
@SuppressWarnings("checkstyle:visibilitymodifier")
final Network network;
/**
* Construct a Publisher for an OTS network with no additional subscription handlers.
* @param network Network; the OTS network
* @throws RemoteException ...
*/
public Publisher(final Network network) throws RemoteException
{
this(network, null, null);
}
/**
* Construct a Publisher for an OTS network.
* @param network Network; the OTS network
* @param additionalSubscriptionHandlers List<SubscriptionHandler>; list of additional subscription handlers to
* register (may be null)
* @param incomingDataHandlers List<IncomingDataHandler>; handlers for data not handled directly by the Publisher (may
* be null).
* @throws RemoteException ...
*/
public Publisher(final Network network, final List<SubscriptionHandler> additionalSubscriptionHandlers,
final List<IncomingDataHandler> incomingDataHandlers) throws RemoteException
{
super("Publisher for " + Throw.whenNull(network, "Network may not be null").getId(),
new MetaData("Publisher for " + network.getId(), "Publisher",
new ObjectDescriptor[] {new ObjectDescriptor("Name of subscription handler", "String", String.class)}),
new MetaData("Subscription handlers", "Subscription handlers",
new ObjectDescriptor[] {new ObjectDescriptor("Name of subscription handler", "String", String.class)}));
this.network = network;
if (incomingDataHandlers != null)
{
for (IncomingDataHandler incomingDataHandler : incomingDataHandlers)
{
this.incomingDataHandlers.put(incomingDataHandler.getKey(), incomingDataHandler);
}
}
GtuIdTransceiver gtuIdTransceiver = new GtuIdTransceiver(network);
GtuTransceiver gtuTransceiver = new GtuTransceiver(network, gtuIdTransceiver);
SubscriptionHandler gtuSubscriptionHandler =
new SubscriptionHandler("GTU move", gtuTransceiver, new LookupEventProducer()
{
@Override
public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
throws Sim0MQException, SerializationException
{
String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
if (bad != null)
{
returnWrapper.nack(bad);
return null;
}
EventProducer result = network.getGTU((String) address[0]);
if (null == result)
{
returnWrapper.nack("No GTU with id \"" + address[0] + "\" found");
}
return result;
}
private final MetaData metaData = new MetaData("GTU Id", "GTU Id",
new ObjectDescriptor[] {new ObjectDescriptor("GTU ID", "GTU Id", String.class)});
@Override
public MetaData getAddressMetaData()
{
return this.metaData;
}
}, null, null, Gtu.MOVE_EVENT, null);
addSubscriptionHandler(gtuSubscriptionHandler);
addSubscriptionHandler(new SubscriptionHandler("GTUs in network", gtuIdTransceiver, new LookupEventProducer()
{
@Override
public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
throws Sim0MQException, SerializationException
{
String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
if (bad != null)
{
returnWrapper.nack(bad);
return null;
}
return network;
}
@Override
public String toString()
{
return "Subscription handler for GTUs in network";
}
@Override
public MetaData getAddressMetaData()
{
return MetaData.EMPTY;
}
}, Network.GTU_ADD_EVENT, Network.GTU_REMOVE_EVENT, null, gtuSubscriptionHandler));
LinkIdTransceiver linkIdTransceiver = new LinkIdTransceiver(network);
LinkTransceiver linkTransceiver = new LinkTransceiver(network, linkIdTransceiver);
SubscriptionHandler linkSubscriptionHandler = new SubscriptionHandler("Link change", linkTransceiver, this.lookupLink,
Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
addSubscriptionHandler(linkSubscriptionHandler);
addSubscriptionHandler(new SubscriptionHandler("Links in network", linkIdTransceiver, new LookupEventProducer()
{
@Override
public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
throws Sim0MQException, SerializationException
{
String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
if (bad != null)
{
returnWrapper.nack(bad);
return null;
}
return network;
}
@Override
public String toString()
{
return "Subscription handler for Links in network";
}
@Override
public MetaData getAddressMetaData()
{
return MetaData.EMPTY;
}
}, Network.LINK_ADD_EVENT, Network.LINK_REMOVE_EVENT, null, linkSubscriptionHandler));
NodeIdTransceiver nodeIdTransceiver = new NodeIdTransceiver(network);
NodeTransceiver nodeTransceiver = new NodeTransceiver(network, nodeIdTransceiver);
// addTransceiver(nodeIdTransceiver);
// addTransceiver(new NodeTransceiver(network, nodeIdTransceiver));
SubscriptionHandler nodeSubscriptionHandler =
new SubscriptionHandler("Node change", nodeTransceiver, new LookupEventProducer()
{
@Override
public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
{
return null; // Nodes do not emit events
}
@Override
public String toString()
{
return "Subscription handler for Node change";
}
private final MetaData metaData = new MetaData("Node Id", "Node Id",
new ObjectDescriptor[] {new ObjectDescriptor("Node ID", "Node Id", String.class)});
@Override
public MetaData getAddressMetaData()
{
return this.metaData;
}
}, null, null, null, null);
addSubscriptionHandler(nodeSubscriptionHandler);
addSubscriptionHandler(new SubscriptionHandler("Nodes in network", nodeIdTransceiver, new LookupEventProducer()
{
@Override
public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
throws Sim0MQException, SerializationException
{
String bad = AbstractTransceiver.verifyMetaData(getAddressMetaData(), address);
if (bad != null)
{
returnWrapper.nack(bad);
return null;
}
return network;
}
@Override
public String toString()
{
return "Subscription handler for Nodes in network";
}
@Override
public MetaData getAddressMetaData()
{
return MetaData.EMPTY;
}
}, Network.NODE_ADD_EVENT, Network.NODE_REMOVE_EVENT, null, nodeSubscriptionHandler));
SubscriptionHandler linkGTUIdSubscriptionHandler = new SubscriptionHandler("GTUs on Link",
new LinkGtuIdTransceiver(network), this.lookupLink, Link.GTU_ADD_EVENT, Link.GTU_REMOVE_EVENT, null, null);
addSubscriptionHandler(linkGTUIdSubscriptionHandler);
addSubscriptionHandler(new SubscriptionHandler("Cross section elements on Link",
new CrossSectionElementTransceiver(network), this.lookupLink, CrossSectionLink.LANE_ADD_EVENT,
CrossSectionLink.LANE_REMOVE_EVENT, null, linkGTUIdSubscriptionHandler));
// addTransceiver(new LaneGtuIdTransceiver(network));
SimulatorStateTransceiver stt = new SimulatorStateTransceiver(network.getSimulator());
SubscriptionHandler simulatorStateSubscriptionHandler = new SubscriptionHandler("Simulator running", stt,
stt.getLookupEventProducer(), null, null, SimulatorStateTransceiver.SIMULATOR_STATE_CHANGED, null);
addSubscriptionHandler(simulatorStateSubscriptionHandler);
if (additionalSubscriptionHandlers != null)
{
for (SubscriptionHandler sh : additionalSubscriptionHandlers)
{
addSubscriptionHandler(sh);
}
}
addSubscriptionHandler(new SubscriptionHandler("", this, null, null, null, null, null)); // The meta transceiver
}
/** Lookup a CrossSectionLink in the network. */
private LookupEventProducer lookupLink = new LookupEventProducer()
{
@Override
public EventProducer lookup(final Object[] address, final ReturnWrapper returnWrapper)
throws IndexOutOfBoundsException, Sim0MQException, SerializationException
{
Throw.whenNull(address, "LookupLink requires the name of a link");
Throw.when(address.length != 1 || !(address[1] instanceof String), IllegalArgumentException.class, "Bad address");
Link link = Publisher.this.network.getLink((String) address[0]);
if (null == link)
{
returnWrapper.nack("Network does not contain a Link with id " + address[0]);
return null;
}
if (!(link instanceof EventProducer))
{
returnWrapper.nack("Link \"" + address[0] + "\" is not able to handle subscriptions");
return null;
}
return (CrossSectionLink) link;
}
@Override
public String toString()
{
return "LookupProducerInterface that looks up a Link in the network";
}
@Override
public MetaData getAddressMetaData()
{
return new MetaData("Link id", "Name of a link in the network",
new ObjectDescriptor[] {new ObjectDescriptor("Link id", "Name of a link in the network", String.class)});
}
};
/**
* Add a SubscriptionHandler to the map.
* @param subscriptionHandler SubscriptionHandler; the subscription handler to add to the map
*/
private void addSubscriptionHandler(final SubscriptionHandler subscriptionHandler)
{
this.subscriptionHandlerMap.put(subscriptionHandler.getId(), subscriptionHandler);
}
/** {@inheritDoc} */
@Override
public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
throws Sim0MQException, SerializationException
{
Throw.whenNull(returnWrapper, "returnWrapper may not be null");
String bad = verifyMetaData(getAddressFields(), address);
if (bad != null)
{
returnWrapper.nack("Bad address (should be the name of a transceiver): " + bad);
return null;
}
SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(address[0]);
if (null == subscriptionHandler)
{
returnWrapper.nack("No transceiver with name \"" + address[0] + "\"");
return null;
}
return new Object[] {subscriptionHandler};
}
/** Returned by the getIdSource method. */
private final TransceiverInterface idSource = new TransceiverInterface()
{
@Override
public String getId()
{
return "Transceiver for names of available transceivers in Publisher";
}
@Override
public MetaData getAddressFields()
{
return MetaData.EMPTY;
}
/** Result of getResultFields. */
private MetaData resultMetaData =
new MetaData("Transceiver names available in Publisher", "String array", new ObjectDescriptor[] {
new ObjectDescriptor("Transceiver names available in Publisher", "String array", String[].class)});
@Override
public MetaData getResultFields()
{
return this.resultMetaData;
}
@Override
public Object[] get(final Object[] address, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
Object[] result = new Object[Publisher.this.subscriptionHandlerMap.size()];
int index = 0;
for (String key : Publisher.this.subscriptionHandlerMap.keySet())
{
result[index++] = key;
}
return result;
}
};
/** {@inheritDoc} */
@Override
public TransceiverInterface getIdSource(final int addressLevel, final ReturnWrapper returnWrapper)
throws Sim0MQException, SerializationException
{
if (0 != addressLevel)
{
returnWrapper.encodeReplyAndTransmit("Address should be 0");
return null;
}
return this.idSource;
}
/** {@inheritDoc} */
@Override
public boolean hasIdSource()
{
return true;
}
/**
* Execute one command.
* @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
* @param command SubscriptionHandler.Command; the operation to perform
* @param address Object[]; the address on which to perform the operation
* @param returnWrapper ReturnWrapper; to transmit the result
* @throws RemoteException on RMI network failure
* @throws SerializationException on illegal type in serialization
* @throws Sim0MQException on communication error
*/
public void executeCommand(final String subscriptionHandlerName, final SubscriptionHandler.Command command,
final Object[] address, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
SubscriptionHandler subscriptionHandler = this.subscriptionHandlerMap.get(subscriptionHandlerName);
if (null == subscriptionHandler)
{
returnWrapper.nack("No subscription handler for \"" + subscriptionHandlerName + "\"");
return;
}
subscriptionHandler.executeCommand(command, address, returnWrapper);
}
/**
* Execute one command.
* @param subscriptionHandlerName String; name of the SubscriptionHandler for which the command is destined
* @param commandString String; the operation to perform
* @param address Object[]; the address on which to perform the operation
* @param returnWrapper ReturnWrapper; to transmit the result
* @throws RemoteException on RMI network failure
* @throws SerializationException on illegal type in serialization
* @throws Sim0MQException on communication error
*/
public void executeCommand(final String subscriptionHandlerName, final String commandString, final Object[] address,
final ReturnWrapperImpl returnWrapper) throws RemoteException, Sim0MQException, SerializationException
{
executeCommand(subscriptionHandlerName,
Throw.whenNull(SubscriptionHandler.lookupCommand(commandString), "Invalid command (%s)", commandString),
address, returnWrapper);
}
/**
* Find the IncomingDataHandler for a particular key.
* @param key String; the key of the IncomingDataHandler
* @return IncomingDataHandler; or null if there is no IncomingDataHandler for the key
*/
public IncomingDataHandler lookupIncomingDataHandler(final String key)
{
return this.incomingDataHandlers.get(key);
}
}