SubscriptionHandler.java

package org.opentrafficsim.sim0mq.publisher;

import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.djutils.exceptions.Throw;
import org.djutils.event.Event;
import org.djutils.event.EventListener;
import org.djutils.event.EventProducer;
import org.djutils.event.EventType;
import org.djutils.event.TimedEvent;
import org.djutils.metadata.MetaData;
import org.djutils.metadata.ObjectDescriptor;
import org.djutils.serialization.SerializationException;
import org.sim0mq.Sim0MQException;

/**
 * Data collection that can be listed and has subscription to change events.
 * <p>
 * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
 * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
 * </p>
 * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
 * @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
 */
public class SubscriptionHandler
{
    /** Id of this SubscriptionHandler. */
    private final String id;

    /** Transceiver to retrieve the data right now; e.g. GtuIdTransceiver. */
    private final TransceiverInterface listTransceiver;

    /** Event producer for add, remove, or change events; e.g. the Network. */
    private final LookupEventProducer eventProducerForAddRemoveOrChange;

    /** EventType to subscribe to in order to receive creation of added object events; e.g. Network.GTU_ADD_EVENT. */
    private final EventType addedEventType;

    /** EventType to subscribe to in order to receive removed object events; e.g. Network.GTU_REMOVE_EVENT. */
    private final EventType removedEventType;

    /** EventType to subscript to in order to receive change of the collection, or object events. */
    private final EventType changeEventType;

    /** SubscriptionHandler that handles subscriptions to individual objects; e.g. GTU.MOVE_EVENT. */
    private final SubscriptionHandler elementSubscriptionHandler;

    /** The currently active subscriptions. */
    private final Map<ReturnWrapper, Subscription> subscriptions = new LinkedHashMap<>();

    /**
     * Create a new SubscriptionHandler.
     * @param id String; id of the new SubscriptionHandler
     * @param listTransceiver TransceiverInterface; transceiver to retrieve the data of <i>the addressed object</i> right now
     * @param eventProducerForAddRemoveOrChange LookupEventProducer; the event producer that can emit the
     *            <code>addedEventType</code>, <code>removedEventType</code>, or <code>changeEventType</code> events
     * @param addedEventType EventType; event type that signals that a new element has been added, should be null if there is no
     *            added event type for the data
     * @param removedEventType EventType; event type that signals that an element has been removed, should be null if there is
     *            no removed event type for the data
     * @param changeEventType EventType; event type that signals that an element has been changed, should be null if there is no
     *            change event type for the data
     * @param elementSubscriptionHandler SubscriptionHandler; SubscriptionHandler for events produced by the underlying elements
     */
    public SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
            final LookupEventProducer eventProducerForAddRemoveOrChange, final EventType addedEventType,
            final EventType removedEventType, final EventType changeEventType,
            final SubscriptionHandler elementSubscriptionHandler)
    {
        Throw.whenNull(id, "Id may not be null");
        Throw.when(
                null == eventProducerForAddRemoveOrChange
                        && (addedEventType != null || removedEventType != null || changeEventType != null),
                NullPointerException.class,
                "eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
        this.id = id;
        this.listTransceiver = listTransceiver;
        this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
        this.addedEventType = addedEventType;
        this.removedEventType = removedEventType;
        this.changeEventType = changeEventType;
        this.elementSubscriptionHandler = elementSubscriptionHandler;
    }

    /**
     * Report what payload is required to retrieve a list of all elements, or data and what format a result would have.
     * @return MetaData; description of the payload required to retrieve a list of all elements, or data and what format a
     *         result would have
     */
    public MetaData listRequestMetaData()
    {
        return this.listTransceiver.getAddressFields();
    }

    /**
     * Report what the payload format of the result of the list transceiver.
     * @return MetaData; the payload format of the result of the list transceiver
     */
    public MetaData listResultMetaData()
    {
        return this.listTransceiver.getResultFields();
    }

    /**
     * Retrieve a data collection.
     * @param address Object[]; address of the requested data collection
     * @param returnWrapper ReturnWrapper; to send back the result
     * @throws RemoteException when communication fails
     * @throws SerializationException on context error
     * @throws Sim0MQException on DSOL error
     */
    public void get(final Object[] address, final ReturnWrapper returnWrapper)
            throws RemoteException, Sim0MQException, SerializationException
    {
        sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
    }

    /**
     * Retrieve the list transceiver (only for testing).
     * @return TransceiverInterface; the list transceiver
     */
    public TransceiverInterface getListTransceiver()
    {
        return this.listTransceiver;
    }

    /**
     * Return the set of supported commands.
     * @return EnumSet&lt;Command&gt;; the set of supported commands.
     */
    public final EnumSet<Command> subscriptionOptions()
    {
        EnumSet<Command> result = EnumSet.noneOf(Command.class);
        if (null != this.addedEventType)
        {
            result.add(Command.SUBSCRIBE_TO_ADD);
            result.add(Command.UNSUBSCRIBE_FROM_ADD);
        }
        if (null != this.removedEventType)
        {
            result.add(Command.SUBSCRIBE_TO_REMOVE);
            result.add(Command.UNSUBSCRIBE_FROM_REMOVE);
        }
        if (null != this.changeEventType)
        {
            result.add(Command.SUBSCRIBE_TO_CHANGE);
            result.add(Command.UNSUBSCRIBE_FROM_CHANGE);
        }
        if (null != this.listTransceiver)
        {
            result.add(Command.GET_CURRENT);
            result.add(Command.GET_ADDRESS_META_DATA);
            result.add(Command.GET_RESULT_META_DATA);
        }
        return result;
    }

    /**
     * Create a new subscription to ADD, REMOVE, or CHANGE events.
     * @param address Object[]; the data that is required to find the correct EventProducer
     * @param eventType EventType; one of the event types that the addressed EventProducer can fire
     * @param returnWrapper ReturnWrapper; generates envelopes for the returned events
     * @throws RemoteException when communication fails
     * @throws SerializationException should never happen
     * @throws Sim0MQException should never happen
     */
    private void subscribeTo(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
            throws RemoteException, Sim0MQException, SerializationException
    {
        if (null == eventType)
        {
            returnWrapper.nack("Does not support subscribe to");
            return;
        }
        String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
        if (bad != null)
        {
            returnWrapper.nack("Bad address: " + bad);
            return;
        }
        EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
        if (null == epi)
        {
            // Not necessarily bad; some EventProducers (e.g. GTUs) may disappear at any time
            return; // NACK has been sent by this.eventProducerForAddRemoveOrChange.lookup
        }
        Subscription subscription = this.subscriptions.get(returnWrapper);
        if (null == subscription)
        {
            subscription = new Subscription(returnWrapper);
            this.subscriptions.put(returnWrapper, subscription);
        }
        if (epi.addListener(subscription, eventType))
        {
            returnWrapper.ack("Subscription created");
        }
        else
        {
            // There was already a subscription?
            returnWrapper.ack("There was already such a subscription active");
        }
        // FIXME: if the subscription is an an Object that later disappears, the subscription map will still consume memory for
        // that subscription. That could add up to a lot of memory ...
    }

    /**
     * Cancel a subscription to ADD, REMOVE, or CHANGE events.
     * @param address Object[]; the data that is required to find the correct EventProducer
     * @param eventType EventType; one of the event types that the addressed EventProducer can fire
     * @param returnWrapper ReturnWrapper; the ReturnWapper that sent the results until now
     * @throws RemoteException when communication fails
     * @throws SerializationException should never happen
     * @throws Sim0MQException should never happen
     */
    private void unsubscribeFrom(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
            throws RemoteException, Sim0MQException, SerializationException
    {
        if (null == eventType)
        {
            returnWrapper.nack("Does not support unsubscribe from");
            return;
        }
        String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
        if (bad != null)
        {
            returnWrapper.nack("Bad address: " + bad);
            return;
        }
        EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
        if (null == epi)
        {
            returnWrapper.nack("Cound not find the event producer of the subscription; has it dissapeared?");
            return;
        }
        Subscription subscription = this.subscriptions.get(returnWrapper);
        if (null == subscription)
        {
            returnWrapper.nack("Cound not find a subscription to cancel");
        }
        else if (!epi.removeListener(subscription, eventType))
        {
            returnWrapper.nack("Subscription was not found");
        }
        else
        {
            this.subscriptions.remove(returnWrapper);
            returnWrapper.ack("Subscription removed");
        }
    }

    /**
     * Retrieve the id of this SubscriptionHandler.
     * @return String; the id of this SubscriptionHandler
     */
    public final String getId()
    {
        return this.id;
    }

    /**
     * The commands that a SubscriptionHandler understands.
     */
    public enum Command
    {
        /** Subscribe to add events. */
        SUBSCRIBE_TO_ADD,
        /** Subscribe to remove events. */
        SUBSCRIBE_TO_REMOVE,
        /** Subscribe to change events. */
        SUBSCRIBE_TO_CHANGE,
        /** Unsubscribe to add events. */
        UNSUBSCRIBE_FROM_ADD,
        /** Unsubscribe to remove events. */
        UNSUBSCRIBE_FROM_REMOVE,
        /** Unsubscribe to change events. */
        UNSUBSCRIBE_FROM_CHANGE,
        /** Get current set (if a collection), c.q. state (if properties of one object). */
        GET_CURRENT,
        /** Get the address meta data. */
        GET_ADDRESS_META_DATA,
        /** Get the result meta data. */
        GET_RESULT_META_DATA,
        /** Get the output of the IdSource. */
        GET_LIST,
        /** Get the set of implemented commands (must - itself - always be implemented). */
        GET_COMMANDS;
    }

    /**
     * Convert a String representing a Command into that Command.
     * @param commandString String; the string
     * @return Command; the corresponding Command, or null if the <code>commandString</code> is not a valid Command
     */
    public static Command lookupCommand(final String commandString)
    {
        if ("GET_ADDRESS_META_DATA".equals(commandString))
        {
            return Command.GET_ADDRESS_META_DATA;
        }
        else if ("GET_CURRENT".equals(commandString))
        {
            return Command.GET_CURRENT;
        }
        else if ("GET_RESULT_META_DATA".equals(commandString))
        {
            return Command.GET_RESULT_META_DATA;
        }
        else if ("GET_RESULT_META_DATA".equals(commandString))
        {
            return Command.GET_RESULT_META_DATA;
        }
        else if ("SUBSCRIBE_TO_ADD".equals(commandString))
        {
            return Command.SUBSCRIBE_TO_ADD;
        }
        else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
        {
            return Command.SUBSCRIBE_TO_CHANGE;
        }
        else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
        {
            return Command.SUBSCRIBE_TO_REMOVE;
        }
        else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
        {
            return Command.UNSUBSCRIBE_FROM_ADD;
        }
        else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
        {
            return Command.UNSUBSCRIBE_FROM_REMOVE;
        }
        else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
        {
            return Command.UNSUBSCRIBE_FROM_CHANGE;
        }
        else if ("GET_LIST".contentEquals(commandString))
        {
            return Command.GET_LIST;
        }
        else if ("GET_COMMANDS".contentEquals(commandString))
        {
            return Command.GET_COMMANDS;
        }
        System.err.println("Could not find command with name \"" + commandString + "\"");
        return null;
    }

    /**
     * Execute one command.
     * @param command Command; the command
     * @param address Object[] the address of the object on which the command must be applied
     * @param returnWrapper ReturnWrapper; envelope generator for replies
     * @throws RemoteException on communication failure
     * @throws SerializationException on illegal type in serialization
     * @throws Sim0MQException on communication error
     */
    public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
            throws RemoteException, Sim0MQException, SerializationException
    {
        Throw.whenNull(command, "Command may not be null");
        Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
        switch (command)
        {
            case SUBSCRIBE_TO_ADD:
                subscribeTo(address, this.addedEventType, returnWrapper);
                break;

            case SUBSCRIBE_TO_CHANGE:
                subscribeTo(address, this.changeEventType, returnWrapper);
                break;

            case SUBSCRIBE_TO_REMOVE:
                subscribeTo(address, this.removedEventType, returnWrapper);
                break;

            case UNSUBSCRIBE_FROM_ADD:
                unsubscribeFrom(address, this.addedEventType, returnWrapper);
                break;

            case UNSUBSCRIBE_FROM_CHANGE:
                unsubscribeFrom(address, this.changeEventType, returnWrapper);
                break;

            case UNSUBSCRIBE_FROM_REMOVE:
                unsubscribeFrom(address, this.removedEventType, returnWrapper);
                break;

            case GET_CURRENT:
            {
                Object[] result = this.listTransceiver.get(address, returnWrapper);
                if (null != result)
                {
                    sendResult(result, returnWrapper);
                }
                // TODO else?
                break;
            }

            case GET_ADDRESS_META_DATA:
                if (null == this.listTransceiver)
                {
                    returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
                }
                sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
                        returnWrapper);
                break;

            case GET_RESULT_META_DATA:
                if (null == this.listTransceiver)
                {
                    returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
                }
                sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
                        returnWrapper);
                break;

            case GET_LIST:
            {
                if (this.listTransceiver.hasIdSource())
                {
                    sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
                            returnWrapper);
                }
                else
                {
                    sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
                }
                break;
            }

            case GET_COMMANDS:
                List<String> resultList = new ArrayList<>();
                if (null != this.addedEventType)
                {
                    resultList.add(Command.SUBSCRIBE_TO_ADD.toString());
                    resultList.add(Command.UNSUBSCRIBE_FROM_ADD.toString());
                }
                if (null != this.removedEventType)
                {
                    resultList.add(Command.SUBSCRIBE_TO_REMOVE.toString());
                    resultList.add(Command.UNSUBSCRIBE_FROM_REMOVE.toString());

                }
                if (null != this.changeEventType)
                {
                    resultList.add(Command.SUBSCRIBE_TO_CHANGE.toString());
                    resultList.add(Command.UNSUBSCRIBE_FROM_CHANGE.toString());
                }
                if (this.listTransceiver.getAddressFields() != null)
                {
                    resultList.add(Command.GET_ADDRESS_META_DATA.toString());
                }
                if (this.listTransceiver.getResultFields() != null)
                {
                    resultList.add(Command.GET_RESULT_META_DATA.toString());
                }
                if (null != this.listTransceiver)
                {
                    resultList.add(Command.GET_LIST.toString());
                }
                resultList.add(Command.GET_COMMANDS.toString());
                Object[] result = new Object[resultList.size()];
                for (int index = 0; index < result.length; index++)
                {
                    result[index] = resultList.get(index);
                }
                returnWrapper.encodeReplyAndTransmit(result);
                break;

            default:
                // Cannot happen
                break;
        }
    }

    /**
     * Extract the class names from an array of ObjectDescriptor.
     * @param objectDescriptors ObjectDescriptor[]; the array of ObjectDescriptor
     * @return Object[]; the class names
     */
    private Object[] extractObjectDescriptorClassNames(final ObjectDescriptor[] objectDescriptors)
    {
        Object[] result = new Object[objectDescriptors.length];
        for (int index = 0; index < objectDescriptors.length; index++)
        {
            result[index] = objectDescriptors[index].getObjectClass().getName();
        }
        return result;
    }

    /**
     * Send data via Sim0MQ to master if (and only if) it is non-null.
     * @param data Object[]; the data to transmit
     * @param returnWrapper ReturnWrapper; envelope constructor for returned results
     * @throws SerializationException on illegal type in serialization
     * @throws Sim0MQException on communication error
     */
    private void sendResult(final Object[] data, final ReturnWrapper returnWrapper)
            throws Sim0MQException, SerializationException
    {
        if (data != null)
        {
            returnWrapper.encodeReplyAndTransmit(data);
        }
    }

    /** {@inheritDoc} */
    @Override
    public String toString()
    {
        return "SubscriptionHandler [id=" + this.id + ", listTransceiver=" + this.listTransceiver
                + ", eventProducerForAddRemoveOrChange=" + this.eventProducerForAddRemoveOrChange + ", addedEventType="
                + this.addedEventType + ", removedEventType=" + this.removedEventType + ", changeEventType="
                + this.changeEventType + ", elementSubscriptionHandler=" + this.elementSubscriptionHandler + "]";
    }

}

/**
 * Handles one subscription.
 */
class Subscription implements EventListener
{
    /** ... */
    private static final long serialVersionUID = 20200428L;

    /** Generates envelopes for the messages sent over Sim0MQ. */
    private final ReturnWrapper returnWrapper;

    /**
     * Construct a new Subscription.
     * @param returnWrapper ReturnWrapper; envelope generator for the messages
     */
    Subscription(final ReturnWrapper returnWrapper)
    {
        this.returnWrapper = returnWrapper;
    }

    /** {@inheritDoc} */
    @Override
    public void notify(final Event event) throws RemoteException
    {
        MetaData metaData = event.getType().getMetaData();
        int additionalFields = event.getType() instanceof EventType ? 1 : 0;
        Object[] result = new Object[additionalFields + metaData.size()];
        // result[0] = event.getType().getName();
        if (additionalFields > 0)
        {
            result[0] = ((TimedEvent<?>) event).getTimeStamp();
        }
        Object payload = event.getContent();
        if (payload instanceof Object[])
        {
            for (int index = 0; index < event.getType().getMetaData().size(); index++)
            {
                result[additionalFields + index] = ((Object[]) payload)[index];
            }
        }
        else
        {
            result[additionalFields] = payload;
        }
        // TODO verify the composition of the result. Problem: no access to the metadata here
        try
        {
            this.returnWrapper.encodeReplyAndTransmit(result);
        }
        catch (Sim0MQException | SerializationException e)
        {
            e.printStackTrace();
        }
    }

}