View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import static org.junit.jupiter.api.Assertions.assertEquals;
4   import static org.junit.jupiter.api.Assertions.assertTrue;
5   import static org.junit.jupiter.api.Assertions.fail;
6   
7   import java.io.IOException;
8   import java.net.URISyntaxException;
9   import java.nio.file.Files;
10  import java.nio.file.Paths;
11  import java.util.ArrayList;
12  import java.util.Collections;
13  import java.util.List;
14  
15  import javax.naming.NamingException;
16  
17  import org.djunits.unit.DurationUnit;
18  import org.djunits.value.vdouble.scalar.Duration;
19  import org.djutils.io.ResourceResolver;
20  import org.djutils.serialization.SerializationException;
21  import org.junit.jupiter.api.Test;
22  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
23  import org.sim0mq.Sim0MQException;
24  import org.sim0mq.message.Sim0MQMessage;
25  import org.zeromq.SocketType;
26  import org.zeromq.ZContext;
27  import org.zeromq.ZMQ;
28  
29  import nl.tudelft.simulation.dsol.SimRuntimeException;
30  import nl.tudelft.simulation.language.DsolException;
31  
32  /**
33   * Unit tests. This requires half of OTS in the imports because it sets up a simulation and runs that for a couple of seconds.
34   * <p>
35   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
36   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
37   * </p>
38   * @author <a href="https://github.com/peter-knoppers">Peter Knoppers</a>
39   */
40  public final class Sim0MQPublisherTest
41  {
42  
43      /** */
44      private Sim0MQPublisherTest()
45      {
46          // do not instantiate test class
47      }
48  
49      /**
50       * This test exists such that the project has a test, and no errors occur during build.
51       */
52      @Test
53      public void dummyTest()
54      {
55          //
56      }
57  
58      /**
59       * Verify an ACK or a NACK message.
60       * @param got the not-yet-decoded message that is expected to decode into an ACK or a NACK
61       * @param field5 expected content for the message type id field
62       * @param field6 expected content for the message id field
63       * @param expectedValue expected Boolean value for the first payload field (field 8)
64       * @param expectedDescription expected String value for the second and last payload field (field 9)
65       * @throws Sim0MQException when that happens, this test has failed
66       * @throws SerializationException when that happens this test has failed
67       */
68      public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
69              final String expectedDescription) throws Sim0MQException, SerializationException
70      {
71          Object[] objects = Sim0MQMessage.decodeToArray(got);
72          assertEquals(field5, objects[5], "Field 5 of message echos the command");
73          assertEquals(field6, objects[6], "conversation id (field 6) matches");
74          assertEquals(10, objects.length, "Response has 2 field payload");
75          assertTrue(objects[8] instanceof Boolean, "First payload field is a boolean");
76          assertEquals(expectedValue, objects[8], "First payload field has the expected value");
77          assertTrue(objects[9] instanceof String, "Second (and last) payload field is a String");
78          if (!((String) objects[9]).startsWith(expectedDescription))
79          {
80              fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
81                      + objects[9] + "\"");
82          }
83      }
84  
85      /**
86       * Wait for an incoming message and verify that it is an ACK or a NACK.
87       * @param receivedMessages the list where incoming messages should appear
88       * @param maximumSeconds maximum time to wait
89       * @param field5 expected content for the message type id field
90       * @param field6 expected content for the message id field
91       * @param expectedValue expected Boolean value for the first payload field (field 8)
92       * @param expectedDescription expected String value for the second and last payload field (field 9)
93       * @throws Sim0MQException when that happens, this test has failed
94       * @throws SerializationException when that happens this test has failed
95       * @throws InterruptedException when that happens this test has failed
96       */
97      public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
98              final int field6, final Boolean expectedValue, final String expectedDescription)
99              throws Sim0MQException, SerializationException, InterruptedException
100     {
101         waitForReceivedMessages(receivedMessages, maximumSeconds);
102         assertEquals(1, receivedMessages.size(), "Should have received one message");
103         verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
104         receivedMessages.clear();
105     }
106 
107     /**
108      * Test code.
109      * @throws IOException if that happens uncaught; this test has failed
110      * @throws NamingException if that happens uncaught; this test has failed
111      * @throws SimRuntimeException if that happens uncaught; this test has failed
112      * @throws DsolException if that happens uncaught; this test has failed
113      * @throws SerializationException if that happens uncaught; this test has failed
114      * @throws Sim0MQException if that happens uncaught; this test has failed
115      * @throws InterruptedException if that happens uncaught; this test has failed
116      * @throws URISyntaxException if network.xml file not found
117      */
118     // FIXME: The test has null pointer exceptions... @Test
119     public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DsolException, Sim0MQException,
120             SerializationException, InterruptedException, URISyntaxException
121     {
122         ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
123         networkXML = new String(Files.readAllBytes(Paths.get(ResourceResolver.resolve("/resources/network.xml").asUri())));
124 
125         List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
126         List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
127         ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
128         readMessageThread.start();
129 
130         PublisherThread publisherThread = new PublisherThread(zContext);
131         publisherThread.start();
132 
133         ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
134         publisherControlSocket.connect("inproc://publisherControl");
135 
136         int conversationId = 100; // Number the commands starting with something that is very different from 0
137         String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
138         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
139         waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
140 
141         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
142                 ++conversationId, new Object[] {Duration.ofSI(10.0)}));
143         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
144 
145         badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
146         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
147         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
148                 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
149 
150         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
151                 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
152         waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
153 
154         // Discover what services and commands are available
155         sendCommand(publisherControlSocket,
156                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
157         waitForReceivedMessages(receivedMessages, 1);
158         assertEquals(1, receivedMessages.size(), "Should have received one message");
159         Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
160         assertTrue(commands.length > 8, "message decodes into more than 8 fields");
161         for (int index = 8; index < commands.length; index++)
162         {
163             receivedMessages.clear();
164             assertTrue(commands[index] instanceof String, "A service is identified by a String");
165             String service = (String) commands[index];
166             System.out.println("Service " + service);
167             sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
168                     service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
169             waitForReceivedMessages(receivedMessages, 1.0);
170             if (receivedMessages.size() > 0)
171             {
172                 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
173                 assertTrue(result.length >= 8, "result of GET_COMMANDS should be at least 8 long");
174                 for (int i = 8; i < result.length; i++)
175                 {
176                     String command = (String) result[i];
177                     receivedMessages.clear();
178                     // System.out.println("trying command " + service + "|" + command);
179                     sendCommand(publisherControlSocket,
180                             Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
181                     waitForReceivedMessages(receivedMessages, 1.0);
182                     if (receivedMessages.size() > 0)
183                     {
184                         for (int ii = 8; ii < receivedMessages.size(); ii++)
185                         {
186                             System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
187                         }
188                     }
189                     else
190                     {
191                         System.out.println("Received no reply");
192                     }
193                     System.out.print(""); // Good for a breakpoint
194                 }
195             }
196             else
197             {
198                 System.out.println("Received no reply to GET_COMMANDS request");
199             }
200         }
201         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
202                 ++conversationId, "2", "BAD")); // Too many fields
203         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
204         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
205         sendCommand(publisherControlSocket,
206                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
207         // Too few fields
208         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
209                 "Bad address: Address for GTU Id has wrong length");
210         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
211                 ++conversationId, "NON EXISTING GTU ID")); // GTU id is not (currently) in use
212         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
213         sendCommand(publisherControlSocket,
214                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
215         sendCommand(publisherControlSocket,
216                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
217         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
218                 ++conversationId, new Object[] {Duration.ofSI(10.0)}));
219         sendCommand(publisherControlSocket,
220                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
221         int conversationIdForSubscribeToAdd = ++conversationId; // We need that to unsubscribe later
222         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
223                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
224         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
225         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
226                 "Subscription created");
227         int conversationIdForGTU2Move = ++conversationId;
228         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
229                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
230         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
231                 ++conversationId, new Object[] {Duration.ofSI(20.0)}));
232         sendCommand(publisherControlSocket,
233                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
234         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
235         // unsubscribe from GTU ADD events using the previously saved conversationId
236         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
237                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
238         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
239         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
240                 "Subscription removed");
241         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
242                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
243         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
244                 ++conversationId, new Object[] {Duration.ofSI(30.0)}));
245         sendCommand(publisherControlSocket,
246                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
247         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
248                 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
249         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
250                 ++conversationId, new Object[] {Duration.ofSI(60.0)}));
251         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
252                 ++conversationId, new Object[] {Duration.ofSI(70.0)}));
253         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
254         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
255                 "Simulation is already at end of simulation time");
256 
257         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
258         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
259         System.out.println("Master joining publisher thread (this should block until publisher has died)");
260         publisherThread.join();
261         System.out.println("Master has joined publisher thread");
262         System.out.println("Master interrupts read message thread");
263         readMessageThread.interrupt();
264         System.out.println("Master has interrupted read message thread; joining ...");
265         readMessageThread.join();
266         System.out.println("Master has joined read message thread");
267         System.out.println("Master exits");
268     }
269 
270     /**
271      * Wait for incoming messages up to one that has a specified conversation id, or until 1000 times time out.
272      * @param receivedMessages the list to monitor
273      * @param maximumSeconds how long to wait (in seconds)
274      * @param conversationId the conversation id to wait for
275      * @throws Sim0MQException when that happens, this test has failed
276      * @throws SerializationException when that happens, this test has failed
277      * @throws InterruptedException when that happens, this test has failed
278      */
279     public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
280             final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
281     {
282         for (int attempt = 0; attempt < 1000; attempt++)
283         {
284             waitForReceivedMessages(receivedMessages, 1.0);
285             // System.out.println("attempt = " + attempt + " received " + receivedMessages.size() + " message(s)");
286             while (receivedMessages.size() > 1)
287             {
288                 receivedMessages.remove(0);
289             }
290             if (receivedMessages.size() == 1)
291             {
292                 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
293                 if (objects[6].equals(conversationId))
294                 {
295                     break;
296                 }
297                 receivedMessages.remove(0);
298             }
299         }
300 
301     }
302 
303     /**
304      * Sleep up to 1 second waiting for at least one message to be received.
305      * @param receivedMessages the list to monitor
306      * @param maximumSeconds how long to wait (in seconds)
307      * @throws InterruptedException when that happens uncaught; this test has failed
308      */
309     static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
310             throws InterruptedException
311     {
312         double timeWaited = 0;
313         while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
314         {
315             Thread.sleep(10);
316             timeWaited += 0.01;
317         }
318     }
319 
320     /**
321      * Wrapper for ZMQ.Socket.send that may output some debugging information.
322      * @param socket ZMQ.Socket; the socket to send onto
323      * @param message the message to transmit
324      */
325     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
326     {
327         // try
328         // {
329         // Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
330         // System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
331         // }
332         // catch (Sim0MQException | SerializationException e)
333         // {
334         // e.printStackTrace();
335         // }
336         socket.send(message);
337     }
338 
339     /**
340      * Repeatedly try to read all available messages.
341      */
342     static class ReadMessageThread extends Thread
343     {
344         /** The ZContext needed to create the socket. */
345         private final ZContext zContext;
346 
347         /** Storage for the received messages. */
348         private final List<byte[]> storage;
349 
350         /**
351          * Repeatedly read all available messages.
352          * @param zContext the ZContext needed to create the read socket
353          * @param storage storage for the received messages
354          */
355         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
356         {
357             this.zContext = zContext;
358             this.storage = storage;
359         }
360 
361         @Override
362         public void run()
363         {
364             System.out.println("Read message thread starting up");
365             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
366             socket.setReceiveTimeOut(100);
367             socket.bind("inproc://publisherOutput");
368             while (!Thread.interrupted())
369             {
370                 byte[][] all = readMessages(socket);
371                 for (byte[] one : all)
372                 {
373                     this.storage.add(one);
374                 }
375             }
376             System.out.println("Read message thread exits due to interrupt");
377         }
378 
379     }
380 
381     /**
382      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
383      * @param socket ZMQ.Socket; the socket
384      * @return the read messages
385      */
386     public static byte[][] readMessages(final ZMQ.Socket socket)
387     {
388         List<byte[]> resultList = new ArrayList<>();
389         while (true)
390         {
391             byte[] message = socket.recv();
392             StringBuilder output = new StringBuilder();
393             if (null != message)
394             {
395                 output.append("Master received " + message.length + " byte message: ");
396                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
397                 try
398                 {
399                     Object[] fields = Sim0MQMessage.decodeToArray(message);
400                     for (Object field : fields)
401                     {
402                         output.append("/" + field);
403                     }
404                     output.append("/");
405                 }
406                 catch (Sim0MQException | SerializationException e)
407                 {
408                     e.printStackTrace();
409                 }
410                 System.out.println(output);
411                 resultList.add(message);
412             }
413             else
414             {
415                 if (resultList.size() > 0)
416                 {
417                     System.out.println(
418                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
419                 }
420                 break;
421             }
422         }
423         return resultList.toArray(new byte[resultList.size()][]);
424     }
425 
426     /**
427      * Thread that runs a PublisherExperiment.
428      */
429     static class PublisherThread extends Thread
430     {
431         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
432         private final ZContext zContext;
433 
434         /**
435          * Construct a new PublisherThread.
436          * @param zContext needed to construct the PublisherExperimentUsingSockets
437          */
438         PublisherThread(final ZContext zContext)
439         {
440             this.zContext = zContext;
441         }
442 
443         /**
444          * Construct a new PublisherThread.
445          */
446         PublisherThread()
447         {
448             this.zContext = new ZContext(5);
449         }
450 
451         @Override
452         public void run()
453         {
454             new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
455             System.out.println("Publisher thread exits");
456         }
457 
458     }
459 
460     /** The test network. */
461     private static String networkXML;
462 
463 }