View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import static org.junit.jupiter.api.Assertions.assertEquals;
4   import static org.junit.jupiter.api.Assertions.assertTrue;
5   import static org.junit.jupiter.api.Assertions.fail;
6   
7   import java.io.IOException;
8   import java.net.URISyntaxException;
9   import java.nio.file.Files;
10  import java.nio.file.Paths;
11  import java.util.ArrayList;
12  import java.util.Collections;
13  import java.util.List;
14  
15  import javax.naming.NamingException;
16  
17  import org.djunits.unit.DurationUnit;
18  import org.djunits.unit.TimeUnit;
19  import org.djunits.value.vdouble.scalar.Duration;
20  import org.djunits.value.vdouble.scalar.Time;
21  import org.djutils.io.URLResource;
22  import org.djutils.serialization.SerializationException;
23  import org.opentrafficsim.draw.OtsDrawingException;
24  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
25  import org.sim0mq.Sim0MQException;
26  import org.sim0mq.message.Sim0MQMessage;
27  import org.zeromq.SocketType;
28  import org.zeromq.ZContext;
29  import org.zeromq.ZMQ;
30  
31  import nl.tudelft.simulation.dsol.SimRuntimeException;
32  import nl.tudelft.simulation.language.DsolException;
33  
34  /**
35   * Unit tests. This requires half of OTS in the imports because it sets up a simulation and runs that for a couple of seconds.
36   * <p>
37   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
38   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
39   * </p>
40   * @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
41   */
42  public class Sim0MQPublisherTest
43  {
44  
45      /**
46       * Verify an ACK or a NACK message.
47       * @param got byte[]; the not-yet-decoded message that is expected to decode into an ACK or a NACK
48       * @param field5 String; expected content for the message type id field
49       * @param field6 int; expected content for the message id field
50       * @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
51       * @param expectedDescription String; expected String value for the second and last payload field (field 9)
52       * @throws Sim0MQException when that happens, this test has failed
53       * @throws SerializationException when that happens this test has failed
54       */
55      public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
56              final String expectedDescription) throws Sim0MQException, SerializationException
57      {
58          Object[] objects = Sim0MQMessage.decodeToArray(got);
59          assertEquals(field5, objects[5], "Field 5 of message echos the command");
60          assertEquals(field6, objects[6], "conversation id (field 6) matches");
61          assertEquals(10, objects.length, "Response has 2 field payload");
62          assertTrue(objects[8] instanceof Boolean, "First payload field is a boolean");
63          assertEquals(expectedValue, objects[8], "First payload field has the expected value");
64          assertTrue(objects[9] instanceof String, "Second (and last) payload field is a String");
65          if (!((String) objects[9]).startsWith(expectedDescription))
66          {
67              fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
68                      + objects[9] + "\"");
69          }
70      }
71  
72      /**
73       * Wait for an incoming message and verify that it is an ACK or a NACK.
74       * @param receivedMessages List&lt;byte[]&gt;; the list where incoming messages should appear
75       * @param maximumSeconds double; maximum time to wait
76       * @param field5 String; expected content for the message type id field
77       * @param field6 int; expected content for the message id field
78       * @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
79       * @param expectedDescription String; expected String value for the second and last payload field (field 9)
80       * @throws Sim0MQException when that happens, this test has failed
81       * @throws SerializationException when that happens this test has failed
82       * @throws InterruptedException when that happens this test has failed
83       */
84      public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
85              final int field6, final Boolean expectedValue, final String expectedDescription)
86              throws Sim0MQException, SerializationException, InterruptedException
87      {
88          waitForReceivedMessages(receivedMessages, maximumSeconds);
89          assertEquals(1, receivedMessages.size(), "Should have received one message");
90          verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
91          receivedMessages.clear();
92      }
93  
94      /**
95       * Test code.
96       * @throws IOException if that happens uncaught; this test has failed
97       * @throws NamingException if that happens uncaught; this test has failed
98       * @throws SimRuntimeException if that happens uncaught; this test has failed
99       * @throws DsolException if that happens uncaught; this test has failed
100      * @throws OtsDrawingException if that happens uncaught; this test has failed
101      * @throws SerializationException if that happens uncaught; this test has failed
102      * @throws Sim0MQException if that happens uncaught; this test has failed
103      * @throws InterruptedException if that happens uncaught; this test has failed
104      * @throws URISyntaxException if network.xml file not found
105      */
106     // FIXME: The test has null pointer exceptions... @Test
107     public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DsolException,
108             OtsDrawingException, Sim0MQException, SerializationException, InterruptedException, URISyntaxException
109     {
110         ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
111         networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/resources/network.xml").toURI())));
112 
113         List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
114         List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
115         ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
116         readMessageThread.start();
117 
118         PublisherThread publisherThread = new PublisherThread(zContext);
119         publisherThread.start();
120 
121         ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
122         publisherControlSocket.connect("inproc://publisherControl");
123 
124         int conversationId = 100; // Number the commands starting with something that is very different from 0
125         String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
126         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
127         waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
128 
129         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
130                 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
131         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
132 
133         badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
134         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
135         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
136                 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
137 
138         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
139                 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
140         waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
141 
142         // Discover what services and commands are available
143         sendCommand(publisherControlSocket,
144                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
145         waitForReceivedMessages(receivedMessages, 1);
146         assertEquals(1, receivedMessages.size(), "Should have received one message");
147         Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
148         assertTrue(commands.length > 8, "message decodes into more than 8 fields");
149         for (int index = 8; index < commands.length; index++)
150         {
151             receivedMessages.clear();
152             assertTrue(commands[index] instanceof String, "A service is identified by a String");
153             String service = (String) commands[index];
154             System.out.println("Service " + service);
155             sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
156                     service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
157             waitForReceivedMessages(receivedMessages, 1.0);
158             if (receivedMessages.size() > 0)
159             {
160                 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
161                 assertTrue(result.length >= 8, "result of GET_COMMANDS should be at least 8 long");
162                 for (int i = 8; i < result.length; i++)
163                 {
164                     String command = (String) result[i];
165                     receivedMessages.clear();
166                     // System.out.println("trying command " + service + "|" + command);
167                     sendCommand(publisherControlSocket,
168                             Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
169                     waitForReceivedMessages(receivedMessages, 1.0);
170                     if (receivedMessages.size() > 0)
171                     {
172                         for (int ii = 8; ii < receivedMessages.size(); ii++)
173                         {
174                             System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
175                         }
176                     }
177                     else
178                     {
179                         System.out.println("Received no reply");
180                     }
181                     System.out.print(""); // Good for a breakpoint
182                 }
183             }
184             else
185             {
186                 System.out.println("Received no reply to GET_COMMANDS request");
187             }
188         }
189         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
190                 ++conversationId, "2", "BAD")); // Too many fields
191         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
192         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
193         sendCommand(publisherControlSocket,
194                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
195         // Too few fields
196         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
197                 "Bad address: Address for GTU Id has wrong length");
198         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
199                 ++conversationId, "NON EXISTING GTU ID")); // GTU id is not (currently) in use
200         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
201         sendCommand(publisherControlSocket,
202                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
203         sendCommand(publisherControlSocket,
204                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
205         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
206                 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
207         sendCommand(publisherControlSocket,
208                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
209         int conversationIdForSubscribeToAdd = ++conversationId; // We need that to unsubscribe later
210         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
211                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
212         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
213         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
214                 "Subscription created");
215         int conversationIdForGTU2Move = ++conversationId;
216         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
217                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
218         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
219                 ++conversationId, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
220         sendCommand(publisherControlSocket,
221                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
222         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
223         // unsubscribe from GTU ADD events using the previously saved conversationId
224         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
225                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
226         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
227         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
228                 "Subscription removed");
229         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
230                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
231         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
232                 ++conversationId, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
233         sendCommand(publisherControlSocket,
234                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
235         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
236                 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
237         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
238                 ++conversationId, new Object[] {new Time(60, TimeUnit.BASE_SECOND)}));
239         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
240                 ++conversationId, new Object[] {new Time(70, TimeUnit.BASE_SECOND)}));
241         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
242         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
243                 "Simulation is already at end of simulation time");
244 
245         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
246         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
247         System.out.println("Master joining publisher thread (this should block until publisher has died)");
248         publisherThread.join();
249         System.out.println("Master has joined publisher thread");
250         System.out.println("Master interrupts read message thread");
251         readMessageThread.interrupt();
252         System.out.println("Master has interrupted read message thread; joining ...");
253         readMessageThread.join();
254         System.out.println("Master has joined read message thread");
255         System.out.println("Master exits");
256     }
257 
258     /**
259      * Wait for incoming messages up to one that has a specified conversation id, or until 1000 times time out.
260      * @param receivedMessages List&lt;byte[]&gt;; the list to monitor
261      * @param maximumSeconds double; how long to wait (in seconds)
262      * @param conversationId int; the conversation id to wait for
263      * @throws Sim0MQException when that happens, this test has failed
264      * @throws SerializationException when that happens, this test has failed
265      * @throws InterruptedException when that happens, this test has failed
266      */
267     public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
268             final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
269     {
270         for (int attempt = 0; attempt < 1000; attempt++)
271         {
272             waitForReceivedMessages(receivedMessages, 1.0);
273             // System.out.println("attempt = " + attempt + " received " + receivedMessages.size() + " message(s)");
274             while (receivedMessages.size() > 1)
275             {
276                 receivedMessages.remove(0);
277             }
278             if (receivedMessages.size() == 1)
279             {
280                 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
281                 if (objects[6].equals(conversationId))
282                 {
283                     break;
284                 }
285                 receivedMessages.remove(0);
286             }
287         }
288 
289     }
290 
291     /**
292      * Sleep up to 1 second waiting for at least one message to be received.
293      * @param receivedMessages List&lt;byte[]&gt;; the list to monitor
294      * @param maximumSeconds double; how long to wait (in seconds)
295      * @throws InterruptedException when that happens uncaught; this test has failed
296      */
297     static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
298             throws InterruptedException
299     {
300         double timeWaited = 0;
301         while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
302         {
303             Thread.sleep(10);
304             timeWaited += 0.01;
305         }
306     }
307 
308     /**
309      * Wrapper for ZMQ.Socket.send that may output some debugging information.
310      * @param socket ZMQ.Socket; the socket to send onto
311      * @param message byte[]; the message to transmit
312      */
313     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
314     {
315         // try
316         // {
317         // Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
318         // System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
319         // }
320         // catch (Sim0MQException | SerializationException e)
321         // {
322         // e.printStackTrace();
323         // }
324         socket.send(message);
325     }
326 
327     /**
328      * Repeatedly try to read all available messages.
329      */
330     static class ReadMessageThread extends Thread
331     {
332         /** The ZContext needed to create the socket. */
333         private final ZContext zContext;
334 
335         /** Storage for the received messages. */
336         private final List<byte[]> storage;
337 
338         /**
339          * Repeatedly read all available messages.
340          * @param zContext ZContext; the ZContext needed to create the read socket
341          * @param storage List&lt;String&gt;; storage for the received messages
342          */
343         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
344         {
345             this.zContext = zContext;
346             this.storage = storage;
347         }
348 
349         @Override
350         public void run()
351         {
352             System.out.println("Read message thread starting up");
353             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
354             socket.setReceiveTimeOut(100);
355             socket.bind("inproc://publisherOutput");
356             while (!Thread.interrupted())
357             {
358                 byte[][] all = readMessages(socket);
359                 for (byte[] one : all)
360                 {
361                     this.storage.add(one);
362                 }
363             }
364             System.out.println("Read message thread exits due to interrupt");
365         }
366 
367     }
368 
369     /**
370      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
371      * @param socket ZMQ.Socket; the socket
372      * @return byte[][]; the read messages
373      */
374     public static byte[][] readMessages(final ZMQ.Socket socket)
375     {
376         List<byte[]> resultList = new ArrayList<>();
377         while (true)
378         {
379             byte[] message = socket.recv();
380             StringBuilder output = new StringBuilder();
381             if (null != message)
382             {
383                 output.append("Master received " + message.length + " byte message: ");
384                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
385                 try
386                 {
387                     Object[] fields = Sim0MQMessage.decodeToArray(message);
388                     for (Object field : fields)
389                     {
390                         output.append("/" + field);
391                     }
392                     output.append("/");
393                 }
394                 catch (Sim0MQException | SerializationException e)
395                 {
396                     e.printStackTrace();
397                 }
398                 System.out.println(output);
399                 resultList.add(message);
400             }
401             else
402             {
403                 if (resultList.size() > 0)
404                 {
405                     System.out.println(
406                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
407                 }
408                 break;
409             }
410         }
411         return resultList.toArray(new byte[resultList.size()][]);
412     }
413 
414     /**
415      * Thread that runs a PublisherExperiment.
416      */
417     static class PublisherThread extends Thread
418     {
419         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
420         private final ZContext zContext;
421 
422         /**
423          * Construct a new PublisherThread.
424          * @param zContext ZContext; needed to construct the PublisherExperimentUsingSockets
425          */
426         PublisherThread(final ZContext zContext)
427         {
428             this.zContext = zContext;
429         }
430 
431         /**
432          * Construct a new PublisherThread.
433          */
434         PublisherThread()
435         {
436             this.zContext = new ZContext(5);
437         }
438 
439         @Override
440         public void run()
441         {
442             new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
443             System.out.println("Publisher thread exits");
444         }
445 
446     }
447 
448     /** The test network. */
449     private static String networkXML;
450 
451 }