View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import static org.junit.jupiter.api.Assertions.assertEquals;
4   import static org.junit.jupiter.api.Assertions.assertTrue;
5   import static org.junit.jupiter.api.Assertions.fail;
6   
7   import java.io.IOException;
8   import java.net.URISyntaxException;
9   import java.nio.file.Files;
10  import java.nio.file.Paths;
11  import java.util.ArrayList;
12  import java.util.Collections;
13  import java.util.List;
14  
15  import javax.naming.NamingException;
16  
17  import org.djunits.unit.DurationUnit;
18  import org.djunits.unit.TimeUnit;
19  import org.djunits.value.vdouble.scalar.Duration;
20  import org.djunits.value.vdouble.scalar.Time;
21  import org.djutils.io.URLResource;
22  import org.djutils.serialization.SerializationException;
23  import org.junit.jupiter.api.Test;
24  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
25  import org.sim0mq.Sim0MQException;
26  import org.sim0mq.message.Sim0MQMessage;
27  import org.zeromq.SocketType;
28  import org.zeromq.ZContext;
29  import org.zeromq.ZMQ;
30  
31  import nl.tudelft.simulation.dsol.SimRuntimeException;
32  import nl.tudelft.simulation.language.DsolException;
33  
34  /**
35   * Unit tests. This requires half of OTS in the imports because it sets up a simulation and runs that for a couple of seconds.
36   * <p>
37   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
38   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
39   * </p>
40   * @author <a href="https://github.com/peter-knoppers">Peter Knoppers</a>
41   */
42  public class Sim0MQPublisherTest
43  {
44      
45      /**
46       * This test exists such that the project has a test, and no errors occur during build.
47       */
48      @Test
49      public void dummyTest()
50      {
51          //
52      }
53  
54      /**
55       * Verify an ACK or a NACK message.
56       * @param got the not-yet-decoded message that is expected to decode into an ACK or a NACK
57       * @param field5 expected content for the message type id field
58       * @param field6 expected content for the message id field
59       * @param expectedValue expected Boolean value for the first payload field (field 8)
60       * @param expectedDescription expected String value for the second and last payload field (field 9)
61       * @throws Sim0MQException when that happens, this test has failed
62       * @throws SerializationException when that happens this test has failed
63       */
64      public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
65              final String expectedDescription) throws Sim0MQException, SerializationException
66      {
67          Object[] objects = Sim0MQMessage.decodeToArray(got);
68          assertEquals(field5, objects[5], "Field 5 of message echos the command");
69          assertEquals(field6, objects[6], "conversation id (field 6) matches");
70          assertEquals(10, objects.length, "Response has 2 field payload");
71          assertTrue(objects[8] instanceof Boolean, "First payload field is a boolean");
72          assertEquals(expectedValue, objects[8], "First payload field has the expected value");
73          assertTrue(objects[9] instanceof String, "Second (and last) payload field is a String");
74          if (!((String) objects[9]).startsWith(expectedDescription))
75          {
76              fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
77                      + objects[9] + "\"");
78          }
79      }
80  
81      /**
82       * Wait for an incoming message and verify that it is an ACK or a NACK.
83       * @param receivedMessages the list where incoming messages should appear
84       * @param maximumSeconds maximum time to wait
85       * @param field5 expected content for the message type id field
86       * @param field6 expected content for the message id field
87       * @param expectedValue expected Boolean value for the first payload field (field 8)
88       * @param expectedDescription expected String value for the second and last payload field (field 9)
89       * @throws Sim0MQException when that happens, this test has failed
90       * @throws SerializationException when that happens this test has failed
91       * @throws InterruptedException when that happens this test has failed
92       */
93      public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
94              final int field6, final Boolean expectedValue, final String expectedDescription)
95              throws Sim0MQException, SerializationException, InterruptedException
96      {
97          waitForReceivedMessages(receivedMessages, maximumSeconds);
98          assertEquals(1, receivedMessages.size(), "Should have received one message");
99          verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
100         receivedMessages.clear();
101     }
102 
103     /**
104      * Test code.
105      * @throws IOException if that happens uncaught; this test has failed
106      * @throws NamingException if that happens uncaught; this test has failed
107      * @throws SimRuntimeException if that happens uncaught; this test has failed
108      * @throws DsolException if that happens uncaught; this test has failed
109      * @throws SerializationException if that happens uncaught; this test has failed
110      * @throws Sim0MQException if that happens uncaught; this test has failed
111      * @throws InterruptedException if that happens uncaught; this test has failed
112      * @throws URISyntaxException if network.xml file not found
113      */
114     // FIXME: The test has null pointer exceptions... @Test
115     public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DsolException, Sim0MQException,
116             SerializationException, InterruptedException, URISyntaxException
117     {
118         ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
119         networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/resources/network.xml").toURI())));
120 
121         List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
122         List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
123         ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
124         readMessageThread.start();
125 
126         PublisherThread publisherThread = new PublisherThread(zContext);
127         publisherThread.start();
128 
129         ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
130         publisherControlSocket.connect("inproc://publisherControl");
131 
132         int conversationId = 100; // Number the commands starting with something that is very different from 0
133         String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
134         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
135         waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
136 
137         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
138                 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
139         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
140 
141         badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
142         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
143         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
144                 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
145 
146         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
147                 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
148         waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
149 
150         // Discover what services and commands are available
151         sendCommand(publisherControlSocket,
152                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
153         waitForReceivedMessages(receivedMessages, 1);
154         assertEquals(1, receivedMessages.size(), "Should have received one message");
155         Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
156         assertTrue(commands.length > 8, "message decodes into more than 8 fields");
157         for (int index = 8; index < commands.length; index++)
158         {
159             receivedMessages.clear();
160             assertTrue(commands[index] instanceof String, "A service is identified by a String");
161             String service = (String) commands[index];
162             System.out.println("Service " + service);
163             sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
164                     service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
165             waitForReceivedMessages(receivedMessages, 1.0);
166             if (receivedMessages.size() > 0)
167             {
168                 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
169                 assertTrue(result.length >= 8, "result of GET_COMMANDS should be at least 8 long");
170                 for (int i = 8; i < result.length; i++)
171                 {
172                     String command = (String) result[i];
173                     receivedMessages.clear();
174                     // System.out.println("trying command " + service + "|" + command);
175                     sendCommand(publisherControlSocket,
176                             Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
177                     waitForReceivedMessages(receivedMessages, 1.0);
178                     if (receivedMessages.size() > 0)
179                     {
180                         for (int ii = 8; ii < receivedMessages.size(); ii++)
181                         {
182                             System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
183                         }
184                     }
185                     else
186                     {
187                         System.out.println("Received no reply");
188                     }
189                     System.out.print(""); // Good for a breakpoint
190                 }
191             }
192             else
193             {
194                 System.out.println("Received no reply to GET_COMMANDS request");
195             }
196         }
197         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
198                 ++conversationId, "2", "BAD")); // Too many fields
199         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
200         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
201         sendCommand(publisherControlSocket,
202                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
203         // Too few fields
204         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
205                 "Bad address: Address for GTU Id has wrong length");
206         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
207                 ++conversationId, "NON EXISTING GTU ID")); // GTU id is not (currently) in use
208         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
209         sendCommand(publisherControlSocket,
210                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
211         sendCommand(publisherControlSocket,
212                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
213         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
214                 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
215         sendCommand(publisherControlSocket,
216                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
217         int conversationIdForSubscribeToAdd = ++conversationId; // We need that to unsubscribe later
218         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
219                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
220         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
221         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
222                 "Subscription created");
223         int conversationIdForGTU2Move = ++conversationId;
224         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
225                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
226         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
227                 ++conversationId, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
228         sendCommand(publisherControlSocket,
229                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
230         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
231         // unsubscribe from GTU ADD events using the previously saved conversationId
232         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
233                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
234         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
235         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
236                 "Subscription removed");
237         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
238                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
239         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
240                 ++conversationId, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
241         sendCommand(publisherControlSocket,
242                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
243         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
244                 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
245         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
246                 ++conversationId, new Object[] {new Time(60, TimeUnit.BASE_SECOND)}));
247         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
248                 ++conversationId, new Object[] {new Time(70, TimeUnit.BASE_SECOND)}));
249         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
250         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
251                 "Simulation is already at end of simulation time");
252 
253         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
254         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
255         System.out.println("Master joining publisher thread (this should block until publisher has died)");
256         publisherThread.join();
257         System.out.println("Master has joined publisher thread");
258         System.out.println("Master interrupts read message thread");
259         readMessageThread.interrupt();
260         System.out.println("Master has interrupted read message thread; joining ...");
261         readMessageThread.join();
262         System.out.println("Master has joined read message thread");
263         System.out.println("Master exits");
264     }
265 
266     /**
267      * Wait for incoming messages up to one that has a specified conversation id, or until 1000 times time out.
268      * @param receivedMessages the list to monitor
269      * @param maximumSeconds how long to wait (in seconds)
270      * @param conversationId the conversation id to wait for
271      * @throws Sim0MQException when that happens, this test has failed
272      * @throws SerializationException when that happens, this test has failed
273      * @throws InterruptedException when that happens, this test has failed
274      */
275     public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
276             final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
277     {
278         for (int attempt = 0; attempt < 1000; attempt++)
279         {
280             waitForReceivedMessages(receivedMessages, 1.0);
281             // System.out.println("attempt = " + attempt + " received " + receivedMessages.size() + " message(s)");
282             while (receivedMessages.size() > 1)
283             {
284                 receivedMessages.remove(0);
285             }
286             if (receivedMessages.size() == 1)
287             {
288                 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
289                 if (objects[6].equals(conversationId))
290                 {
291                     break;
292                 }
293                 receivedMessages.remove(0);
294             }
295         }
296 
297     }
298 
299     /**
300      * Sleep up to 1 second waiting for at least one message to be received.
301      * @param receivedMessages the list to monitor
302      * @param maximumSeconds how long to wait (in seconds)
303      * @throws InterruptedException when that happens uncaught; this test has failed
304      */
305     static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
306             throws InterruptedException
307     {
308         double timeWaited = 0;
309         while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
310         {
311             Thread.sleep(10);
312             timeWaited += 0.01;
313         }
314     }
315 
316     /**
317      * Wrapper for ZMQ.Socket.send that may output some debugging information.
318      * @param socket ZMQ.Socket; the socket to send onto
319      * @param message the message to transmit
320      */
321     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
322     {
323         // try
324         // {
325         // Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
326         // System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
327         // }
328         // catch (Sim0MQException | SerializationException e)
329         // {
330         // e.printStackTrace();
331         // }
332         socket.send(message);
333     }
334 
335     /**
336      * Repeatedly try to read all available messages.
337      */
338     static class ReadMessageThread extends Thread
339     {
340         /** The ZContext needed to create the socket. */
341         private final ZContext zContext;
342 
343         /** Storage for the received messages. */
344         private final List<byte[]> storage;
345 
346         /**
347          * Repeatedly read all available messages.
348          * @param zContext the ZContext needed to create the read socket
349          * @param storage storage for the received messages
350          */
351         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
352         {
353             this.zContext = zContext;
354             this.storage = storage;
355         }
356 
357         @Override
358         public void run()
359         {
360             System.out.println("Read message thread starting up");
361             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
362             socket.setReceiveTimeOut(100);
363             socket.bind("inproc://publisherOutput");
364             while (!Thread.interrupted())
365             {
366                 byte[][] all = readMessages(socket);
367                 for (byte[] one : all)
368                 {
369                     this.storage.add(one);
370                 }
371             }
372             System.out.println("Read message thread exits due to interrupt");
373         }
374 
375     }
376 
377     /**
378      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
379      * @param socket ZMQ.Socket; the socket
380      * @return the read messages
381      */
382     public static byte[][] readMessages(final ZMQ.Socket socket)
383     {
384         List<byte[]> resultList = new ArrayList<>();
385         while (true)
386         {
387             byte[] message = socket.recv();
388             StringBuilder output = new StringBuilder();
389             if (null != message)
390             {
391                 output.append("Master received " + message.length + " byte message: ");
392                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
393                 try
394                 {
395                     Object[] fields = Sim0MQMessage.decodeToArray(message);
396                     for (Object field : fields)
397                     {
398                         output.append("/" + field);
399                     }
400                     output.append("/");
401                 }
402                 catch (Sim0MQException | SerializationException e)
403                 {
404                     e.printStackTrace();
405                 }
406                 System.out.println(output);
407                 resultList.add(message);
408             }
409             else
410             {
411                 if (resultList.size() > 0)
412                 {
413                     System.out.println(
414                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
415                 }
416                 break;
417             }
418         }
419         return resultList.toArray(new byte[resultList.size()][]);
420     }
421 
422     /**
423      * Thread that runs a PublisherExperiment.
424      */
425     static class PublisherThread extends Thread
426     {
427         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
428         private final ZContext zContext;
429 
430         /**
431          * Construct a new PublisherThread.
432          * @param zContext needed to construct the PublisherExperimentUsingSockets
433          */
434         PublisherThread(final ZContext zContext)
435         {
436             this.zContext = zContext;
437         }
438 
439         /**
440          * Construct a new PublisherThread.
441          */
442         PublisherThread()
443         {
444             this.zContext = new ZContext(5);
445         }
446 
447         @Override
448         public void run()
449         {
450             new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
451             System.out.println("Publisher thread exits");
452         }
453 
454     }
455 
456     /** The test network. */
457     private static String networkXML;
458 
459 }