View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import static org.junit.Assert.assertEquals;
4   import static org.junit.Assert.assertTrue;
5   import static org.junit.Assert.fail;
6   
7   import java.io.IOException;
8   import java.net.URISyntaxException;
9   import java.nio.file.Files;
10  import java.nio.file.Paths;
11  import java.util.ArrayList;
12  import java.util.Collections;
13  import java.util.List;
14  
15  import javax.naming.NamingException;
16  
17  import org.djunits.unit.DurationUnit;
18  import org.djunits.unit.TimeUnit;
19  import org.djunits.value.vdouble.scalar.Duration;
20  import org.djunits.value.vdouble.scalar.Time;
21  import org.djutils.io.URLResource;
22  import org.djutils.serialization.SerializationException;
23  import org.junit.Test;
24  import org.opentrafficsim.draw.core.OTSDrawingException;
25  import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
26  import org.sim0mq.Sim0MQException;
27  import org.sim0mq.message.Sim0MQMessage;
28  import org.zeromq.SocketType;
29  import org.zeromq.ZContext;
30  import org.zeromq.ZMQ;
31  
32  import nl.tudelft.simulation.dsol.SimRuntimeException;
33  import nl.tudelft.simulation.language.DSOLException;
34  
35  /**
36   * Unit tests. This requires half of OTS in the imports because it sets up a simulation and runs that for a couple of seconds.
37   * <p>
38   * Copyright (c) 2020-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
39   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
40   * <p>
41   * $LastChangedDate: 2020-02-13 11:08:16 +0100 (Thu, 13 Feb 2020) $, @version $Revision: 6383 $, by $Author: pknoppers $,
42   * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
43   */
44  public class Sim0MQPublisherTest
45  {
46  
47      /**
48       * Verify an ACK or a NACK message.
49       * @param got byte[]; the not-yet-decoded message that is expected to decode into an ACK or a NACK
50       * @param field5 String; expected content for the message type id field
51       * @param field6 int; expected content for the message id field
52       * @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
53       * @param expectedDescription String; expected String value for the second and last payload field (field 9)
54       * @throws Sim0MQException when that happens, this test has failed
55       * @throws SerializationException when that happens this test has failed
56       */
57      public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
58              final String expectedDescription) throws Sim0MQException, SerializationException
59      {
60          Object[] objects = Sim0MQMessage.decodeToArray(got);
61          assertEquals("Field 5 of message echos the command", field5, objects[5]);
62          assertEquals("conversation id (field 6) matches", field6, objects[6]);
63          assertEquals("Response has 2 field payload", 10, objects.length);
64          assertTrue("First payload field is a boolean", objects[8] instanceof Boolean);
65          assertEquals("First payload field has the expected value", expectedValue, objects[8]);
66          assertTrue("Second (and last) payload field is a String", objects[9] instanceof String);
67          if (!((String) objects[9]).startsWith(expectedDescription))
68          {
69              fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
70                      + objects[9] + "\"");
71          }
72      }
73  
74      /**
75       * Wait for an incoming message and verify that it is an ACK or a NACK.
76       * @param receivedMessages List&lt;byte[]&gt;; the list where incoming messages should appear
77       * @param maximumSeconds double; maximum time to wait
78       * @param field5 String; expected content for the message type id field
79       * @param field6 int; expected content for the message id field
80       * @param expectedValue Boolean; expected Boolean value for the first payload field (field 8)
81       * @param expectedDescription String; expected String value for the second and last payload field (field 9)
82       * @throws Sim0MQException when that happens, this test has failed
83       * @throws SerializationException when that happens this test has failed
84       * @throws InterruptedException when that happens this test has failed
85       */
86      public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
87              final int field6, final Boolean expectedValue, final String expectedDescription)
88              throws Sim0MQException, SerializationException, InterruptedException
89      {
90          waitForReceivedMessages(receivedMessages, maximumSeconds);
91          assertEquals("Should have received one message", 1, receivedMessages.size());
92          verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
93          receivedMessages.clear();
94      }
95  
96      /**
97       * Test code.
98       * @throws IOException if that happens uncaught; this test has failed
99       * @throws NamingException if that happens uncaught; this test has failed
100      * @throws SimRuntimeException if that happens uncaught; this test has failed
101      * @throws DSOLException if that happens uncaught; this test has failed
102      * @throws OTSDrawingException if that happens uncaught; this test has failed
103      * @throws SerializationException if that happens uncaught; this test has failed
104      * @throws Sim0MQException if that happens uncaught; this test has failed
105      * @throws InterruptedException if that happens uncaught; this test has failed
106      * @throws URISyntaxException if network.xml file not found
107      */
108     // FIXME: The test has null pointer exceptions... @Test
109     public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DSOLException,
110             OTSDrawingException, Sim0MQException, SerializationException, InterruptedException, URISyntaxException
111     {
112         ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
113         networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/resources/network.xml").toURI())));
114 
115         List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
116         List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
117         ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
118         readMessageThread.start();
119 
120         PublisherThread publisherThread = new PublisherThread(zContext);
121         publisherThread.start();
122 
123         ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
124         publisherControlSocket.connect("inproc://publisherControl");
125 
126         int conversationId = 100; // Number the commands starting with something that is very different from 0
127         String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
128         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
129         waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
130 
131         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
132                 ++conversationId, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
133         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
134 
135         badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
136         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
137         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
138                 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
139 
140         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
141                 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
142         waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
143 
144         // Discover what services and commands are available
145         sendCommand(publisherControlSocket,
146                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
147         waitForReceivedMessages(receivedMessages, 1);
148         assertEquals("Should have received one message", 1, receivedMessages.size());
149         Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
150         assertTrue("message decodes into more than 8 fields", commands.length > 8);
151         for (int index = 8; index < commands.length; index++)
152         {
153             receivedMessages.clear();
154             assertTrue("A service is identified by a String", commands[index] instanceof String);
155             String service = (String) commands[index];
156             System.out.println("Service " + service);
157             sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
158                     service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
159             waitForReceivedMessages(receivedMessages, 1.0);
160             if (receivedMessages.size() > 0)
161             {
162                 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
163                 assertTrue("result of GET_COMMANDS should be at least 8 long", result.length >= 8);
164                 for (int i = 8; i < result.length; i++)
165                 {
166                     String command = (String) result[i];
167                     receivedMessages.clear();
168                     // System.out.println("trying command " + service + "|" + command);
169                     sendCommand(publisherControlSocket,
170                             Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
171                     waitForReceivedMessages(receivedMessages, 1.0);
172                     if (receivedMessages.size() > 0)
173                     {
174                         for (int ii = 8; ii < receivedMessages.size(); ii++)
175                         {
176                             System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
177                         }
178                     }
179                     else
180                     {
181                         System.out.println("Received no reply");
182                     }
183                     System.out.print(""); // Good for a breakpoint
184                 }
185             }
186             else
187             {
188                 System.out.println("Received no reply to GET_COMMANDS request");
189             }
190         }
191         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
192                 ++conversationId, "2", "BAD")); // Too many fields
193         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
194         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
195         sendCommand(publisherControlSocket,
196                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
197         // Too few fields
198         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
199                 "Bad address: Address for GTU Id has wrong length");
200         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
201                 ++conversationId, "NON EXISTING GTU ID")); // GTU id is not (currently) in use
202         waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
203         sendCommand(publisherControlSocket,
204                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
205         sendCommand(publisherControlSocket,
206                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
207         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
208                 ++conversationId, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
209         sendCommand(publisherControlSocket,
210                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
211         int conversationIdForSubscribeToAdd = ++conversationId; // We need that to unsubscribe later
212         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
213                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
214         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
215         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
216                 "Subscription created");
217         int conversationIdForGTU2Move = ++conversationId;
218         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
219                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
220         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
221                 ++conversationId, new Object[] { new Time(20, TimeUnit.BASE_SECOND) }));
222         sendCommand(publisherControlSocket,
223                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
224         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
225         // unsubscribe from GTU ADD events using the previously saved conversationId
226         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
227                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
228         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
229         waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
230                 "Subscription removed");
231         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
232                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
233         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
234                 ++conversationId, new Object[] { new Time(30, TimeUnit.BASE_SECOND) }));
235         sendCommand(publisherControlSocket,
236                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
237         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
238                 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
239         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
240                 ++conversationId, new Object[] { new Time(60, TimeUnit.BASE_SECOND) }));
241         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
242                 ++conversationId, new Object[] { new Time(70, TimeUnit.BASE_SECOND) }));
243         waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
244         waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
245                 "Simulation is already at end of simulation time");
246 
247         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
248         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
249         System.out.println("Master joining publisher thread (this should block until publisher has died)");
250         publisherThread.join();
251         System.out.println("Master has joined publisher thread");
252         System.out.println("Master interrupts read message thread");
253         readMessageThread.interrupt();
254         System.out.println("Master has interrupted read message thread; joining ...");
255         readMessageThread.join();
256         System.out.println("Master has joined read message thread");
257         System.out.println("Master exits");
258     }
259 
260     /**
261      * Wait for incoming messages up to one that has a specified conversation id, or until 1000 times time out.
262      * @param receivedMessages List&lt;byte[]&gt;; the list to monitor
263      * @param maximumSeconds double; how long to wait (in seconds)
264      * @param conversationId int; the conversation id to wait for
265      * @throws Sim0MQException when that happens, this test has failed
266      * @throws SerializationException when that happens, this test has failed
267      * @throws InterruptedException when that happens, this test has failed
268      */
269     public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
270             final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
271     {
272         for (int attempt = 0; attempt < 1000; attempt++)
273         {
274             waitForReceivedMessages(receivedMessages, 1.0);
275             // System.out.println("attempt = " + attempt + " received " + receivedMessages.size() + " message(s)");
276             while (receivedMessages.size() > 1)
277             {
278                 receivedMessages.remove(0);
279             }
280             if (receivedMessages.size() == 1)
281             {
282                 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
283                 if (objects[6].equals(conversationId))
284                 {
285                     break;
286                 }
287                 receivedMessages.remove(0);
288             }
289         }
290 
291     }
292 
293     /**
294      * Sleep up to 1 second waiting for at least one message to be received.
295      * @param receivedMessages List&lt;byte[]&gt;; the list to monitor
296      * @param maximumSeconds double; how long to wait (in seconds)
297      * @throws InterruptedException when that happens uncaught; this test has failed
298      */
299     static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
300             throws InterruptedException
301     {
302         double timeWaited = 0;
303         while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
304         {
305             Thread.sleep(10);
306             timeWaited += 0.01;
307         }
308     }
309 
310     /**
311      * Wrapper for ZMQ.Socket.send that may output some debugging information.
312      * @param socket ZMQ.Socket; the socket to send onto
313      * @param message byte[]; the message to transmit
314      */
315     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
316     {
317         // try
318         // {
319         // Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
320         // System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
321         // }
322         // catch (Sim0MQException | SerializationException e)
323         // {
324         // e.printStackTrace();
325         // }
326         socket.send(message);
327     }
328 
329     /**
330      * Repeatedly try to read all available messages.
331      */
332     static class ReadMessageThread extends Thread
333     {
334         /** The ZContext needed to create the socket. */
335         private final ZContext zContext;
336 
337         /** Storage for the received messages. */
338         private final List<byte[]> storage;
339 
340         /**
341          * Repeatedly read all available messages.
342          * @param zContext ZContext; the ZContext needed to create the read socket
343          * @param storage List&lt;String&gt;; storage for the received messages
344          */
345         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
346         {
347             this.zContext = zContext;
348             this.storage = storage;
349         }
350 
351         @Override
352         public void run()
353         {
354             System.out.println("Read message thread starting up");
355             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
356             socket.setReceiveTimeOut(100);
357             socket.bind("inproc://publisherOutput");
358             while (!Thread.interrupted())
359             {
360                 byte[][] all = readMessages(socket);
361                 for (byte[] one : all)
362                 {
363                     this.storage.add(one);
364                 }
365             }
366             System.out.println("Read message thread exits due to interrupt");
367         }
368 
369     }
370 
371     /**
372      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
373      * @param socket ZMQ.Socket; the socket
374      * @return byte[][]; the read messages
375      */
376     public static byte[][] readMessages(final ZMQ.Socket socket)
377     {
378         List<byte[]> resultList = new ArrayList<>();
379         while (true)
380         {
381             byte[] message = socket.recv();
382             StringBuilder output = new StringBuilder();
383             if (null != message)
384             {
385                 output.append("Master received " + message.length + " byte message: ");
386                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
387                 try
388                 {
389                     Object[] fields = Sim0MQMessage.decodeToArray(message);
390                     for (Object field : fields)
391                     {
392                         output.append("/" + field);
393                     }
394                     output.append("/");
395                 }
396                 catch (Sim0MQException | SerializationException e)
397                 {
398                     e.printStackTrace();
399                 }
400                 System.out.println(output);
401                 resultList.add(message);
402             }
403             else
404             {
405                 if (resultList.size() > 0)
406                 {
407                     System.out.println(
408                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
409                 }
410                 break;
411             }
412         }
413         return resultList.toArray(new byte[resultList.size()][]);
414     }
415 
416     /**
417      * Thread that runs a PublisherExperiment.
418      */
419     static class PublisherThread extends Thread
420     {
421         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
422         private final ZContext zContext;
423 
424         /**
425          * Construct a new PublisherThread.
426          * @param zContext ZContext; needed to construct the PublisherExperimentUsingSockets
427          */
428         PublisherThread(final ZContext zContext)
429         {
430             this.zContext = zContext;
431         }
432 
433         /**
434          * Construct a new PublisherThread.
435          */
436         PublisherThread()
437         {
438             this.zContext = new ZContext(5);
439         }
440 
441         @Override
442         public void run()
443         {
444             new Sim0MQPublisher(this.zContext, "publisherControl", "publisherOutput");
445             System.out.println("Publisher thread exits");
446         }
447 
448     }
449 
450     /** The test network. */
451     private static String networkXML;
452     
453 }