View Javadoc
1   package org.opentrafficsim.sim0mq.publisher;
2   
3   import java.util.Map;
4   import java.util.concurrent.atomic.AtomicInteger;
5   
6   import org.djunits.Throw;
7   import org.djutils.serialization.SerializationException;
8   import org.djutils.serialization.SerializationRuntimeException;
9   import org.sim0mq.Sim0MQException;
10  import org.sim0mq.message.Sim0MQMessage;
11  import org.zeromq.SocketType;
12  import org.zeromq.ZContext;
13  import org.zeromq.ZMQ;
14  import org.zeromq.ZMQ.Socket;
15  
16  /**
17   * Container for all data needed to reply (once, or multiple times) to a Sim0MQ request.
18   * <p>
19   * Copyright (c) 2020-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
20   * BSD-style license. See <a href="https://opentrafficsim.org/docs/current/license.html">OpenTrafficSim License</a>.
21   * </p>
22   * @author <a href="https://www.tudelft.nl/averbraeck">Alexander Verbraeck</a>
23   * @author <a href="https://www.tudelft.nl/pknoppers">Peter Knoppers</a>
24   * @author <a href="https://www.transport.citg.tudelft.nl">Wouter Schakel</a>
25   */
26  public class ReturnWrapperImpl implements ReturnWrapper
27  {
28      /** The ZContext needed to create the return socket(s). */
29      private final ZContext zContext;
30  
31      /** Federation id. */
32      private final Object federationId;
33  
34      /** Sender id (to be used as return address). */
35      private final Object returnAddress;
36  
37      /** Our id (to be used as sender address in replies). */
38      private final Object ourAddress;
39  
40      /** Message type id used by the sender; re-used in the reply. */
41      private final Object messageTypeId;
42  
43      /** Message id used by the sender; re-used in the reply; post-incremented by one if it is an Integer. */
44      private final Object messageId;
45  
46      /** Number of replies sent. SHOULD NOT BE USED IN equals or hashCode! */
47      private int replyCount = 0;
48  
49      /**
50       * Construct a new ReturnWrapper.
51       * @param zContext ZContext; the ZContext needed to create sockets for returned messages
52       * @param receivedMessage byte[]; the received message from which the reply envelope will be derived
53       * @param socketMap Map&lt;Long, ZMQ.Socket&gt;; cache of created sockets for returned messages
54       * @param packetsSent AtomicInteger; counter for returned messages
55       * @throws SerializationException when the received message has an incorrect envelope
56       * @throws Sim0MQException when the received message cannot be decoded
57       */
58      ReturnWrapperImpl(final ZContext zContext, final byte[] receivedMessage, final Map<Long, Socket> socketMap,
59              final AtomicInteger packetsSent) throws Sim0MQException, SerializationException
60      {
61          this(zContext, Sim0MQMessage.decode(receivedMessage).createObjectArray(), socketMap);
62      }
63  
64      /**
65       * Construct a new ReturnWrapper.
66       * @param zContext ZContext; the ZContext needed to create sockets for returned messages
67       * @param decodedReceivedMessage Object[]; decoded Sim0MQ message
68       * @param socketMap Map&lt;Long, ZMQ.Socket&gt;; cache of created sockets for returned messages
69       */
70      public ReturnWrapperImpl(final ZContext zContext, final Object[] decodedReceivedMessage, final Map<Long, Socket> socketMap)
71      {
72          Throw.whenNull(zContext, "zContext may not be null");
73          Throw.whenNull(socketMap, "socket map may not be null");
74          this.zContext = zContext;
75          this.socketMap = socketMap;
76          Throw.when(decodedReceivedMessage.length < 8, SerializationRuntimeException.class,
77                  "Received message is too short (minumum number of elements is 8; got %d", decodedReceivedMessage.length);
78          this.federationId = decodedReceivedMessage[2];
79          this.returnAddress = decodedReceivedMessage[3];
80          this.ourAddress = decodedReceivedMessage[4];
81          this.messageTypeId = decodedReceivedMessage[5];
82          this.messageId = decodedReceivedMessage[6];
83      }
84  
85      /** In memory sockets to talk to the multiplexer. */
86      private final Map<Long, ZMQ.Socket> socketMap;
87  
88      /**
89       * Central portal to send a message to the master.
90       * @param data byte[]; the data to send
91       */
92      public synchronized void sendToMaster(final byte[] data)
93      {
94          Long threadId = Thread.currentThread().getId();
95          ZMQ.Socket socket = this.socketMap.get(threadId);
96          while (null == socket)
97          {
98              // System.out.println("socket map is " + this.socketMap);
99              System.out.println("Creating new internal socket for thread " + threadId + " (map currently contains "
100                     + this.socketMap.size() + " entries)");
101             socket = this.zContext.createSocket(SocketType.PUSH);
102             socket.setHWM(100000);
103             socket.connect("inproc://simulationEvents");
104             this.socketMap.put(threadId, socket);
105             // System.out.println("Socket created; map now contains " + this.socketMap.size() + " entries");
106         }
107         // System.out.println("pre send");
108         socket.send(data, 0);
109         // System.out.println("post send");
110     }
111 
112     /** {@inheritDoc} */
113     @Override
114     public void encodeReplyAndTransmit(final Boolean ackNack, final Object[] payload)
115             throws Sim0MQException, SerializationException
116     {
117         Throw.whenNull(payload, "payload may not be null (but it can be an emty Object array)");
118         Object fixedMessageTypeId = this.messageTypeId;
119         Object[] fixedPayload = payload;
120         if (null != ackNack)
121         {
122             fixedPayload = new Object[payload.length + 1];
123             fixedPayload[0] = ackNack;
124             for (int index = 0; index < payload.length; index++)
125             {
126                 fixedPayload[index + 1] = payload[index];
127             }
128         }
129         byte[] result = Sim0MQMessage.encodeUTF8(true, this.federationId, this.ourAddress, this.returnAddress,
130                 fixedMessageTypeId, this.messageId, fixedPayload);
131         sendToMaster(result);
132         // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, result));
133     }
134 
135     /** {@inheritDoc} */
136     @Override
137     public String toString()
138     {
139         return "ReturnWrapper [federationId=" + this.federationId + ", returnAddress=" + this.returnAddress + ", ourAddress=" + this.ourAddress
140                 + ", messageTypeId=" + this.messageTypeId + ", messageId=" + this.messageId + ", replyCount=" + this.replyCount + "]";
141     }
142 
143     /** {@inheritDoc} */
144     @Override
145     public int hashCode()
146     {
147         final int prime = 31;
148         int result = 1;
149         result = prime * result + ((this.federationId == null) ? 0 : this.federationId.hashCode());
150         result = prime * result + ((this.messageId == null) ? 0 : this.messageId.hashCode());
151         result = prime * result + ((this.messageTypeId == null) ? 0 : this.messageTypeId.hashCode());
152         result = prime * result + ((this.ourAddress == null) ? 0 : this.ourAddress.hashCode());
153         result = prime * result + ((this.returnAddress == null) ? 0 : this.returnAddress.hashCode());
154         return result; // replyCount is NOT used!
155     }
156 
157     /** {@inheritDoc} */
158     @Override
159     @SuppressWarnings("checkstyle:needbraces")
160     public boolean equals(final Object obj)
161     {
162         if (this == obj)
163             return true;
164         if (obj == null)
165             return false;
166         if (getClass() != obj.getClass())
167             return false;
168         ReturnWrapperImpl./org/opentrafficsim/sim0mq/publisher/ReturnWrapperImpl.html#ReturnWrapperImpl">ReturnWrapperImpl other = (ReturnWrapperImpl) obj;
169         if (this.federationId == null)
170         {
171             if (other.federationId != null)
172                 return false;
173         }
174         else if (!this.federationId.equals(other.federationId))
175             return false;
176         if (this.messageId == null)
177         {
178             if (other.messageId != null)
179                 return false;
180         }
181         else if (!this.messageId.equals(other.messageId))
182             return false;
183         if (this.messageTypeId == null)
184         {
185             if (other.messageTypeId != null)
186                 return false;
187         }
188         else if (!this.messageTypeId.equals(other.messageTypeId))
189             return false;
190         if (this.ourAddress == null)
191         {
192             if (other.ourAddress != null)
193                 return false;
194         }
195         else if (!this.ourAddress.equals(other.ourAddress))
196             return false;
197         if (this.returnAddress == null)
198         {
199             if (other.returnAddress != null)
200                 return false;
201         }
202         else if (!this.returnAddress.equals(other.returnAddress))
203             return false;
204         return true; // replyCount is NOT used
205     }
206 }