SubscriptionHandler.java
package org.opentrafficsim.sim0mq.publisher;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.djutils.exceptions.Throw;
import org.djutils.event.Event;
import org.djutils.event.EventListener;
import org.djutils.event.EventProducer;
import org.djutils.event.EventType;
import org.djutils.event.TimedEvent;
import org.djutils.metadata.MetaData;
import org.djutils.metadata.ObjectDescriptor;
import org.djutils.serialization.SerializationException;
import org.sim0mq.Sim0MQException;
/**
* Data collection that can be listed and has subscription to change events.
* <p>
* Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
* BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
* </p>
* @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
* @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
*/
public class SubscriptionHandler
{
/** Id of this SubscriptionHandler. */
private final String id;
/** Transceiver to retrieve the data right now; e.g. GtuIdTransceiver. */
private final TransceiverInterface listTransceiver;
/** Event producer for add, remove, or change events; e.g. the Network. */
private final LookupEventProducer eventProducerForAddRemoveOrChange;
/** EventType to subscribe to in order to receive creation of added object events; e.g. Network.GTU_ADD_EVENT. */
private final EventType addedEventType;
/** EventType to subscribe to in order to receive removed object events; e.g. Network.GTU_REMOVE_EVENT. */
private final EventType removedEventType;
/** EventType to subscript to in order to receive change of the collection, or object events. */
private final EventType changeEventType;
/** SubscriptionHandler that handles subscriptions to individual objects; e.g. GTU.MOVE_EVENT. */
private final SubscriptionHandler elementSubscriptionHandler;
/** The currently active subscriptions. */
private final Map<ReturnWrapper, Subscription> subscriptions = new LinkedHashMap<>();
/**
* Create a new SubscriptionHandler.
* @param id String; id of the new SubscriptionHandler
* @param listTransceiver TransceiverInterface; transceiver to retrieve the data of <i>the addressed object</i> right now
* @param eventProducerForAddRemoveOrChange LookupEventProducer; the event producer that can emit the
* <code>addedEventType</code>, <code>removedEventType</code>, or <code>changeEventType</code> events
* @param addedEventType EventType; event type that signals that a new element has been added, should be null if there is no
* added event type for the data
* @param removedEventType EventType; event type that signals that an element has been removed, should be null if there is
* no removed event type for the data
* @param changeEventType EventType; event type that signals that an element has been changed, should be null if there is no
* change event type for the data
* @param elementSubscriptionHandler SubscriptionHandler; SubscriptionHandler for events produced by the underlying elements
*/
public SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
final LookupEventProducer eventProducerForAddRemoveOrChange, final EventType addedEventType,
final EventType removedEventType, final EventType changeEventType,
final SubscriptionHandler elementSubscriptionHandler)
{
Throw.whenNull(id, "Id may not be null");
Throw.when(
null == eventProducerForAddRemoveOrChange
&& (addedEventType != null || removedEventType != null || changeEventType != null),
NullPointerException.class,
"eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
this.id = id;
this.listTransceiver = listTransceiver;
this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
this.addedEventType = addedEventType;
this.removedEventType = removedEventType;
this.changeEventType = changeEventType;
this.elementSubscriptionHandler = elementSubscriptionHandler;
}
/**
* Report what payload is required to retrieve a list of all elements, or data and what format a result would have.
* @return MetaData; description of the payload required to retrieve a list of all elements, or data and what format a
* result would have
*/
public MetaData listRequestMetaData()
{
return this.listTransceiver.getAddressFields();
}
/**
* Report what the payload format of the result of the list transceiver.
* @return MetaData; the payload format of the result of the list transceiver
*/
public MetaData listResultMetaData()
{
return this.listTransceiver.getResultFields();
}
/**
* Retrieve a data collection.
* @param address Object[]; address of the requested data collection
* @param returnWrapper ReturnWrapper; to send back the result
* @throws RemoteException when communication fails
* @throws SerializationException on context error
* @throws Sim0MQException on DSOL error
*/
public void get(final Object[] address, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
}
/**
* Retrieve the list transceiver (only for testing).
* @return TransceiverInterface; the list transceiver
*/
public TransceiverInterface getListTransceiver()
{
return this.listTransceiver;
}
/**
* Return the set of supported commands.
* @return EnumSet<Command>; the set of supported commands.
*/
public final EnumSet<Command> subscriptionOptions()
{
EnumSet<Command> result = EnumSet.noneOf(Command.class);
if (null != this.addedEventType)
{
result.add(Command.SUBSCRIBE_TO_ADD);
result.add(Command.UNSUBSCRIBE_FROM_ADD);
}
if (null != this.removedEventType)
{
result.add(Command.SUBSCRIBE_TO_REMOVE);
result.add(Command.UNSUBSCRIBE_FROM_REMOVE);
}
if (null != this.changeEventType)
{
result.add(Command.SUBSCRIBE_TO_CHANGE);
result.add(Command.UNSUBSCRIBE_FROM_CHANGE);
}
if (null != this.listTransceiver)
{
result.add(Command.GET_CURRENT);
result.add(Command.GET_ADDRESS_META_DATA);
result.add(Command.GET_RESULT_META_DATA);
}
return result;
}
/**
* Create a new subscription to ADD, REMOVE, or CHANGE events.
* @param address Object[]; the data that is required to find the correct EventProducer
* @param eventType EventType; one of the event types that the addressed EventProducer can fire
* @param returnWrapper ReturnWrapper; generates envelopes for the returned events
* @throws RemoteException when communication fails
* @throws SerializationException should never happen
* @throws Sim0MQException should never happen
*/
private void subscribeTo(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
if (null == eventType)
{
returnWrapper.nack("Does not support subscribe to");
return;
}
String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
if (bad != null)
{
returnWrapper.nack("Bad address: " + bad);
return;
}
EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
if (null == epi)
{
// Not necessarily bad; some EventProducers (e.g. GTUs) may disappear at any time
return; // NACK has been sent by this.eventProducerForAddRemoveOrChange.lookup
}
Subscription subscription = this.subscriptions.get(returnWrapper);
if (null == subscription)
{
subscription = new Subscription(returnWrapper);
this.subscriptions.put(returnWrapper, subscription);
}
if (epi.addListener(subscription, eventType))
{
returnWrapper.ack("Subscription created");
}
else
{
// There was already a subscription?
returnWrapper.ack("There was already such a subscription active");
}
// FIXME: if the subscription is an an Object that later disappears, the subscription map will still consume memory for
// that subscription. That could add up to a lot of memory ...
}
/**
* Cancel a subscription to ADD, REMOVE, or CHANGE events.
* @param address Object[]; the data that is required to find the correct EventProducer
* @param eventType EventType; one of the event types that the addressed EventProducer can fire
* @param returnWrapper ReturnWrapper; the ReturnWapper that sent the results until now
* @throws RemoteException when communication fails
* @throws SerializationException should never happen
* @throws Sim0MQException should never happen
*/
private void unsubscribeFrom(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
if (null == eventType)
{
returnWrapper.nack("Does not support unsubscribe from");
return;
}
String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
if (bad != null)
{
returnWrapper.nack("Bad address: " + bad);
return;
}
EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
if (null == epi)
{
returnWrapper.nack("Cound not find the event producer of the subscription; has it dissapeared?");
return;
}
Subscription subscription = this.subscriptions.get(returnWrapper);
if (null == subscription)
{
returnWrapper.nack("Cound not find a subscription to cancel");
}
else if (!epi.removeListener(subscription, eventType))
{
returnWrapper.nack("Subscription was not found");
}
else
{
this.subscriptions.remove(returnWrapper);
returnWrapper.ack("Subscription removed");
}
}
/**
* Retrieve the id of this SubscriptionHandler.
* @return String; the id of this SubscriptionHandler
*/
public final String getId()
{
return this.id;
}
/**
* The commands that a SubscriptionHandler understands.
*/
public enum Command
{
/** Subscribe to add events. */
SUBSCRIBE_TO_ADD,
/** Subscribe to remove events. */
SUBSCRIBE_TO_REMOVE,
/** Subscribe to change events. */
SUBSCRIBE_TO_CHANGE,
/** Unsubscribe to add events. */
UNSUBSCRIBE_FROM_ADD,
/** Unsubscribe to remove events. */
UNSUBSCRIBE_FROM_REMOVE,
/** Unsubscribe to change events. */
UNSUBSCRIBE_FROM_CHANGE,
/** Get current set (if a collection), c.q. state (if properties of one object). */
GET_CURRENT,
/** Get the address meta data. */
GET_ADDRESS_META_DATA,
/** Get the result meta data. */
GET_RESULT_META_DATA,
/** Get the output of the IdSource. */
GET_LIST,
/** Get the set of implemented commands (must - itself - always be implemented). */
GET_COMMANDS;
}
/**
* Convert a String representing a Command into that Command.
* @param commandString String; the string
* @return Command; the corresponding Command, or null if the <code>commandString</code> is not a valid Command
*/
public static Command lookupCommand(final String commandString)
{
if ("GET_ADDRESS_META_DATA".equals(commandString))
{
return Command.GET_ADDRESS_META_DATA;
}
else if ("GET_CURRENT".equals(commandString))
{
return Command.GET_CURRENT;
}
else if ("GET_RESULT_META_DATA".equals(commandString))
{
return Command.GET_RESULT_META_DATA;
}
else if ("GET_RESULT_META_DATA".equals(commandString))
{
return Command.GET_RESULT_META_DATA;
}
else if ("SUBSCRIBE_TO_ADD".equals(commandString))
{
return Command.SUBSCRIBE_TO_ADD;
}
else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
{
return Command.SUBSCRIBE_TO_CHANGE;
}
else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
{
return Command.SUBSCRIBE_TO_REMOVE;
}
else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_ADD;
}
else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_REMOVE;
}
else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
{
return Command.UNSUBSCRIBE_FROM_CHANGE;
}
else if ("GET_LIST".contentEquals(commandString))
{
return Command.GET_LIST;
}
else if ("GET_COMMANDS".contentEquals(commandString))
{
return Command.GET_COMMANDS;
}
System.err.println("Could not find command with name \"" + commandString + "\"");
return null;
}
/**
* Execute one command.
* @param command Command; the command
* @param address Object[] the address of the object on which the command must be applied
* @param returnWrapper ReturnWrapper; envelope generator for replies
* @throws RemoteException on communication failure
* @throws SerializationException on illegal type in serialization
* @throws Sim0MQException on communication error
*/
public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
throws RemoteException, Sim0MQException, SerializationException
{
Throw.whenNull(command, "Command may not be null");
Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
switch (command)
{
case SUBSCRIBE_TO_ADD:
subscribeTo(address, this.addedEventType, returnWrapper);
break;
case SUBSCRIBE_TO_CHANGE:
subscribeTo(address, this.changeEventType, returnWrapper);
break;
case SUBSCRIBE_TO_REMOVE:
subscribeTo(address, this.removedEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_ADD:
unsubscribeFrom(address, this.addedEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_CHANGE:
unsubscribeFrom(address, this.changeEventType, returnWrapper);
break;
case UNSUBSCRIBE_FROM_REMOVE:
unsubscribeFrom(address, this.removedEventType, returnWrapper);
break;
case GET_CURRENT:
{
Object[] result = this.listTransceiver.get(address, returnWrapper);
if (null != result)
{
sendResult(result, returnWrapper);
}
// TODO else?
break;
}
case GET_ADDRESS_META_DATA:
if (null == this.listTransceiver)
{
returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
}
sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
returnWrapper);
break;
case GET_RESULT_META_DATA:
if (null == this.listTransceiver)
{
returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
}
sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
returnWrapper);
break;
case GET_LIST:
{
if (this.listTransceiver.hasIdSource())
{
sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
returnWrapper);
}
else
{
sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
}
break;
}
case GET_COMMANDS:
List<String> resultList = new ArrayList<>();
if (null != this.addedEventType)
{
resultList.add(Command.SUBSCRIBE_TO_ADD.toString());
resultList.add(Command.UNSUBSCRIBE_FROM_ADD.toString());
}
if (null != this.removedEventType)
{
resultList.add(Command.SUBSCRIBE_TO_REMOVE.toString());
resultList.add(Command.UNSUBSCRIBE_FROM_REMOVE.toString());
}
if (null != this.changeEventType)
{
resultList.add(Command.SUBSCRIBE_TO_CHANGE.toString());
resultList.add(Command.UNSUBSCRIBE_FROM_CHANGE.toString());
}
if (this.listTransceiver.getAddressFields() != null)
{
resultList.add(Command.GET_ADDRESS_META_DATA.toString());
}
if (this.listTransceiver.getResultFields() != null)
{
resultList.add(Command.GET_RESULT_META_DATA.toString());
}
if (null != this.listTransceiver)
{
resultList.add(Command.GET_LIST.toString());
}
resultList.add(Command.GET_COMMANDS.toString());
Object[] result = new Object[resultList.size()];
for (int index = 0; index < result.length; index++)
{
result[index] = resultList.get(index);
}
returnWrapper.encodeReplyAndTransmit(result);
break;
default:
// Cannot happen
break;
}
}
/**
* Extract the class names from an array of ObjectDescriptor.
* @param objectDescriptors ObjectDescriptor[]; the array of ObjectDescriptor
* @return Object[]; the class names
*/
private Object[] extractObjectDescriptorClassNames(final ObjectDescriptor[] objectDescriptors)
{
Object[] result = new Object[objectDescriptors.length];
for (int index = 0; index < objectDescriptors.length; index++)
{
result[index] = objectDescriptors[index].getObjectClass().getName();
}
return result;
}
/**
* Send data via Sim0MQ to master if (and only if) it is non-null.
* @param data Object[]; the data to transmit
* @param returnWrapper ReturnWrapper; envelope constructor for returned results
* @throws SerializationException on illegal type in serialization
* @throws Sim0MQException on communication error
*/
private void sendResult(final Object[] data, final ReturnWrapper returnWrapper)
throws Sim0MQException, SerializationException
{
if (data != null)
{
returnWrapper.encodeReplyAndTransmit(data);
}
}
/** {@inheritDoc} */
@Override
public String toString()
{
return "SubscriptionHandler [id=" + this.id + ", listTransceiver=" + this.listTransceiver
+ ", eventProducerForAddRemoveOrChange=" + this.eventProducerForAddRemoveOrChange + ", addedEventType="
+ this.addedEventType + ", removedEventType=" + this.removedEventType + ", changeEventType="
+ this.changeEventType + ", elementSubscriptionHandler=" + this.elementSubscriptionHandler + "]";
}
}
/**
* Handles one subscription.
*/
class Subscription implements EventListener
{
/** ... */
private static final long serialVersionUID = 20200428L;
/** Generates envelopes for the messages sent over Sim0MQ. */
private final ReturnWrapper returnWrapper;
/**
* Construct a new Subscription.
* @param returnWrapper ReturnWrapper; envelope generator for the messages
*/
Subscription(final ReturnWrapper returnWrapper)
{
this.returnWrapper = returnWrapper;
}
/** {@inheritDoc} */
@Override
public void notify(final Event event) throws RemoteException
{
MetaData metaData = event.getType().getMetaData();
int additionalFields = event.getType() instanceof EventType ? 1 : 0;
Object[] result = new Object[additionalFields + metaData.size()];
// result[0] = event.getType().getName();
if (additionalFields > 0)
{
result[0] = ((TimedEvent<?>) event).getTimeStamp();
}
Object payload = event.getContent();
if (payload instanceof Object[])
{
for (int index = 0; index < event.getType().getMetaData().size(); index++)
{
result[additionalFields + index] = ((Object[]) payload)[index];
}
}
else
{
result[additionalFields] = payload;
}
// TODO verify the composition of the result. Problem: no access to the metadata here
try
{
this.returnWrapper.encodeReplyAndTransmit(result);
}
catch (Sim0MQException | SerializationException e)
{
e.printStackTrace();
}
}
}