View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import static org.junit.Assert.assertEquals;
4   import static org.junit.Assert.assertTrue;
5   import static org.junit.Assert.fail;
6   
7   import java.io.IOException;
8   import java.net.URISyntaxException;
9   import java.nio.file.Files;
10  import java.nio.file.Paths;
11  import java.util.ArrayList;
12  import java.util.Collections;
13  import java.util.List;
14  
15  import javax.naming.NamingException;
16  
17  import org.djunits.unit.DurationUnit;
18  import org.djunits.unit.TimeUnit;
19  import org.djunits.value.vdouble.scalar.Duration;
20  import org.djunits.value.vdouble.scalar.Time;
21  import org.djutils.io.URLResource;
22  import org.djutils.serialization.SerializationException;
23  import org.junit.Test;
24  import org.opentrafficsim.draw.core.OtsDrawingException;
25  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
26  import org.sim0mq.Sim0MQException;
27  import org.sim0mq.message.Sim0MQMessage;
28  import org.zeromq.SocketType;
29  import org.zeromq.ZContext;
30  import org.zeromq.ZMQ;
31  
32  import nl.tudelft.simulation.dsol.SimRuntimeException;
33  import nl.tudelft.simulation.language.DSOLException;
34  
35  /**
36   * Unit tests. This requires half of OTS in the imports because it sets up a simulation and runs that for a couple of seconds.
37   * <p>
38   * Copyright (c) 2020-2023 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
39   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
40   * </p>
41   * @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
42   */
43  public class Sim0MQPublisherTest
44  {
45  
46      /**
47       * Verify an ACK or a NACK message.
48       * @param got byte[]; the not-yet-decoded message that is expected to decode into an ACK or a NACK
49       * @param field5 String; expected content for the message type id field
50       * @param field6 int; expected content for the message id field
51       * @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
52       * @param expectedDescription String; expected String value for the second and last payload field (field 9)
53       * @throws Sim0MQException when that happens, this test has failed
54       * @throws SerializationException when that happens this test has failed
55       */
56      public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
57              final String expectedDescription) throws Sim0MQException, SerializationException
58      {
59          Object[] objects = Sim0MQMessage.decodeToArray(got);
60          assertEquals("Field 5 of message echos the command", field5, objects[5]);
61          assertEquals("conversation id (field 6) matches", field6, objects[6]);
62          assertEquals("Response has 2 field payload", 10, objects.length);
63          assertTrue("First payload field is a boolean", objects[8] instanceof Boolean);
64          assertEquals("First payload field has the expected value", expectedValue, objects[8]);
65          assertTrue("Second (and last) payload field is a String", objects[9] instanceof String);
66          if (!((String) objects[9]).startsWith(expectedDescription))
67          {
68              fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
69                      + objects[9] + "\"");
70          }
71      }
72  
73      /**
74       * Wait for an incoming message and verify that it is an ACK or a NACK.
75       * @param receivedMessages List&lt;byte[]&gt;; the list where incoming messages should appear
76       * @param maximumSeconds double; maximum time to wait
77       * @param field5 String; expected content for the message type id field
78       * @param field6 int; expected content for the message id field
79       * @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
80       * @param expectedDescription String; expected String value for the second and last payload field (field 9)
81       * @throws Sim0MQException when that happens, this test has failed
82       * @throws SerializationException when that happens this test has failed
83       * @throws InterruptedException when that happens this test has failed
84       */
85      public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
86              final int field6, final Boolean expectedValue, final String expectedDescription)
87              throws Sim0MQException, SerializationException, InterruptedException
88      {
89          waitForReceivedMessages(receivedMessages, maximumSeconds);
90          assertEquals("Should have received one message", 1, receivedMessages.size());
91          verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
92          receivedMessages.clear();
93      }
94  
95      /**
96       * Test code.
97       * @throws IOException if that happens uncaught; this test has failed
98       * @throws NamingException if that happens uncaught; this test has failed
99       * @throws SimRuntimeException if that happens uncaught; this test has failed
100      * @throws DSOLException if that happens uncaught; this test has failed
101      * @throws OtsDrawingException if that happens uncaught; this test has failed
102      * @throws SerializationException if that happens uncaught; this test has failed
103      * @throws Sim0MQException if that happens uncaught; this test has failed
104      * @throws InterruptedException if that happens uncaught; this test has failed
105      * @throws URISyntaxException if network.xml file not found
106      */
107     // FIXME: The test has null pointer exceptions... @Test
108     public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DSOLException,
109             OtsDrawingException, Sim0MQException, SerializationException, InterruptedException, URISyntaxException
110     {
111         ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
112         networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/resources/network.xml").toURI())));
113 
114         List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
115         List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
116         ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
117         readMessageThread.start();
118 
119         PublisherThread publisherThread = new PublisherThread(zContext);
120         publisherThread.start();
121 
122         ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
123         publisherControlSocket.connect("inproc://publisherControl");
124 
125         int conversationId = 100; // Number the commands starting with something that is very different from 0
126         String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
127         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
128         waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
129 
130         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
131                 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
132         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
133 
134         badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
135         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
136         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
137                 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
138 
139         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
140                 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
141         waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
142 
143         // Discover what services and commands are available
144         sendCommand(publisherControlSocket,
145                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
146         waitForReceivedMessages(receivedMessages, 1);
147         assertEquals("Should have received one message", 1, receivedMessages.size());
148         Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
149         assertTrue("message decodes into more than 8 fields", commands.length > 8);
150         for (int index = 8; index < commands.length; index++)
151         {
152             receivedMessages.clear();
153             assertTrue("A service is identified by a String", commands[index] instanceof String);
154             String service = (String) commands[index];
155             System.out.println("Service " + service);
156             sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
157                     service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
158             waitForReceivedMessages(receivedMessages, 1.0);
159             if (receivedMessages.size() > 0)
160             {
161                 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
162                 assertTrue("result of GET_COMMANDS should be at least 8 long", result.length >= 8);
163                 for (int i = 8; i < result.length; i++)
164                 {
165                     String command = (String) result[i];
166                     receivedMessages.clear();
167                     // System.out.println("trying command " + service + "|" + command);
168                     sendCommand(publisherControlSocket,
169                             Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
170                     waitForReceivedMessages(receivedMessages, 1.0);
171                     if (receivedMessages.size() > 0)
172                     {
173                         for (int ii = 8; ii < receivedMessages.size(); ii++)
174                         {
175                             System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
176                         }
177                     }
178                     else
179                     {
180                         System.out.println("Received no reply");
181                     }
182                     System.out.print(""); // Good for a breakpoint
183                 }
184             }
185             else
186             {
187                 System.out.println("Received no reply to GET_COMMANDS request");
188             }
189         }
190         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
191                 ++conversationId, "2", "BAD")); // Too many fields
192         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
193         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
194         sendCommand(publisherControlSocket,
195                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
196         // Too few fields
197         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
198                 "Bad address: Address for GTU Id has wrong length");
199         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
200                 ++conversationId, "NON EXISTING GTU ID")); // GTU id is not (currently) in use
201         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
202         sendCommand(publisherControlSocket,
203                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
204         sendCommand(publisherControlSocket,
205                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
206         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
207                 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
208         sendCommand(publisherControlSocket,
209                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
210         int conversationIdForSubscribeToAdd = ++conversationId; // We need that to unsubscribe later
211         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
212                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
213         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
214         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
215                 "Subscription created");
216         int conversationIdForGTU2Move = ++conversationId;
217         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
218                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
219         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
220                 ++conversationId, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
221         sendCommand(publisherControlSocket,
222                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
223         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
224         // unsubscribe from GTU ADD events using the previously saved conversationId
225         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
226                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
227         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
228         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
229                 "Subscription removed");
230         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
231                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
232         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
233                 ++conversationId, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
234         sendCommand(publisherControlSocket,
235                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
236         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
237                 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
238         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
239                 ++conversationId, new Object[] {new Time(60, TimeUnit.BASE_SECOND)}));
240         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
241                 ++conversationId, new Object[] {new Time(70, TimeUnit.BASE_SECOND)}));
242         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
243         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
244                 "Simulation is already at end of simulation time");
245 
246         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
247         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
248         System.out.println("Master joining publisher thread (this should block until publisher has died)");
249         publisherThread.join();
250         System.out.println("Master has joined publisher thread");
251         System.out.println("Master interrupts read message thread");
252         readMessageThread.interrupt();
253         System.out.println("Master has interrupted read message thread; joining ...");
254         readMessageThread.join();
255         System.out.println("Master has joined read message thread");
256         System.out.println("Master exits");
257     }
258 
259     /**
260      * Wait for incoming messages up to one that has a specified conversation id, or until 1000 times time out.
261      * @param receivedMessages List&lt;byte[]&gt;; the list to monitor
262      * @param maximumSeconds double; how long to wait (in seconds)
263      * @param conversationId int; the conversation id to wait for
264      * @throws Sim0MQException when that happens, this test has failed
265      * @throws SerializationException when that happens, this test has failed
266      * @throws InterruptedException when that happens, this test has failed
267      */
268     public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
269             final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
270     {
271         for (int attempt = 0; attempt < 1000; attempt++)
272         {
273             waitForReceivedMessages(receivedMessages, 1.0);
274             // System.out.println("attempt = " + attempt + " received " + receivedMessages.size() + " message(s)");
275             while (receivedMessages.size() > 1)
276             {
277                 receivedMessages.remove(0);
278             }
279             if (receivedMessages.size() == 1)
280             {
281                 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
282                 if (objects[6].equals(conversationId))
283                 {
284                     break;
285                 }
286                 receivedMessages.remove(0);
287             }
288         }
289 
290     }
291 
292     /**
293      * Sleep up to 1 second waiting for at least one message to be received.
294      * @param receivedMessages List&lt;byte[]&gt;; the list to monitor
295      * @param maximumSeconds double; how long to wait (in seconds)
296      * @throws InterruptedException when that happens uncaught; this test has failed
297      */
298     static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
299             throws InterruptedException
300     {
301         double timeWaited = 0;
302         while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
303         {
304             Thread.sleep(10);
305             timeWaited += 0.01;
306         }
307     }
308 
309     /**
310      * Wrapper for ZMQ.Socket.send that may output some debugging information.
311      * @param socket ZMQ.Socket; the socket to send onto
312      * @param message byte[]; the message to transmit
313      */
314     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
315     {
316         // try
317         // {
318         // Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
319         // System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
320         // }
321         // catch (Sim0MQException | SerializationException e)
322         // {
323         // e.printStackTrace();
324         // }
325         socket.send(message);
326     }
327 
328     /**
329      * Repeatedly try to read all available messages.
330      */
331     static class ReadMessageThread extends Thread
332     {
333         /** The ZContext needed to create the socket. */
334         private final ZContext zContext;
335 
336         /** Storage for the received messages. */
337         private final List<byte[]> storage;
338 
339         /**
340          * Repeatedly read all available messages.
341          * @param zContext ZContext; the ZContext needed to create the read socket
342          * @param storage List&lt;String&gt;; storage for the received messages
343          */
344         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
345         {
346             this.zContext = zContext;
347             this.storage = storage;
348         }
349 
350         @Override
351         public void run()
352         {
353             System.out.println("Read message thread starting up");
354             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
355             socket.setReceiveTimeOut(100);
356             socket.bind("inproc://publisherOutput");
357             while (!Thread.interrupted())
358             {
359                 byte[][] all = readMessages(socket);
360                 for (byte[] one : all)
361                 {
362                     this.storage.add(one);
363                 }
364             }
365             System.out.println("Read message thread exits due to interrupt");
366         }
367 
368     }
369 
370     /**
371      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
372      * @param socket ZMQ.Socket; the socket
373      * @return byte[][]; the read messages
374      */
375     public static byte[][] readMessages(final ZMQ.Socket socket)
376     {
377         List<byte[]> resultList = new ArrayList<>();
378         while (true)
379         {
380             byte[] message = socket.recv();
381             StringBuilder output = new StringBuilder();
382             if (null != message)
383             {
384                 output.append("Master received " + message.length + " byte message: ");
385                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
386                 try
387                 {
388                     Object[] fields = Sim0MQMessage.decodeToArray(message);
389                     for (Object field : fields)
390                     {
391                         output.append("/" + field);
392                     }
393                     output.append("/");
394                 }
395                 catch (Sim0MQException | SerializationException e)
396                 {
397                     e.printStackTrace();
398                 }
399                 System.out.println(output);
400                 resultList.add(message);
401             }
402             else
403             {
404                 if (resultList.size() > 0)
405                 {
406                     System.out.println(
407                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
408                 }
409                 break;
410             }
411         }
412         return resultList.toArray(new byte[resultList.size()][]);
413     }
414 
415     /**
416      * Thread that runs a PublisherExperiment.
417      */
418     static class PublisherThread extends Thread
419     {
420         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
421         private final ZContext zContext;
422 
423         /**
424          * Construct a new PublisherThread.
425          * @param zContext ZContext; needed to construct the PublisherExperimentUsingSockets
426          */
427         PublisherThread(final ZContext zContext)
428         {
429             this.zContext = zContext;
430         }
431 
432         /**
433          * Construct a new PublisherThread.
434          */
435         PublisherThread()
436         {
437             this.zContext = new ZContext(5);
438         }
439 
440         @Override
441         public void run()
442         {
443             new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
444             System.out.println("Publisher thread exits");
445         }
446 
447     }
448 
449     /** The test network. */
450     private static String networkXML;
451 
452 }