1 package org.opentrafficsim.sim0mq.swing;
2
3 import static org.junit.jupiter.api.Assertions.assertEquals;
4 import static org.junit.jupiter.api.Assertions.assertTrue;
5 import static org.junit.jupiter.api.Assertions.fail;
6
7 import java.io.IOException;
8 import java.net.URISyntaxException;
9 import java.nio.file.Files;
10 import java.nio.file.Paths;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14
15 import javax.naming.NamingException;
16
17 import org.djunits.unit.DurationUnit;
18 import org.djunits.value.vdouble.scalar.Duration;
19 import org.djutils.io.ResourceResolver;
20 import org.djutils.serialization.SerializationException;
21 import org.junit.jupiter.api.Test;
22 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
23 import org.sim0mq.Sim0MQException;
24 import org.sim0mq.message.Sim0MQMessage;
25 import org.zeromq.SocketType;
26 import org.zeromq.ZContext;
27 import org.zeromq.ZMQ;
28
29 import nl.tudelft.simulation.dsol.SimRuntimeException;
30 import nl.tudelft.simulation.language.DsolException;
31
32
33
34
35
36
37
38
39
40 public final class Sim0MQPublisherTest
41 {
42
43
44 private Sim0MQPublisherTest()
45 {
46
47 }
48
49
50
51
52 @Test
53 public void dummyTest()
54 {
55
56 }
57
58
59
60
61
62
63
64
65
66
67
68 public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
69 final String expectedDescription) throws Sim0MQException, SerializationException
70 {
71 Object[] objects = Sim0MQMessage.decodeToArray(got);
72 assertEquals(field5, objects[5], "Field 5 of message echos the command");
73 assertEquals(field6, objects[6], "conversation id (field 6) matches");
74 assertEquals(10, objects.length, "Response has 2 field payload");
75 assertTrue(objects[8] instanceof Boolean, "First payload field is a boolean");
76 assertEquals(expectedValue, objects[8], "First payload field has the expected value");
77 assertTrue(objects[9] instanceof String, "Second (and last) payload field is a String");
78 if (!((String) objects[9]).startsWith(expectedDescription))
79 {
80 fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
81 + objects[9] + "\"");
82 }
83 }
84
85
86
87
88
89
90
91
92
93
94
95
96
97 public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
98 final int field6, final Boolean expectedValue, final String expectedDescription)
99 throws Sim0MQException, SerializationException, InterruptedException
100 {
101 waitForReceivedMessages(receivedMessages, maximumSeconds);
102 assertEquals(1, receivedMessages.size(), "Should have received one message");
103 verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
104 receivedMessages.clear();
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119 public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DsolException, Sim0MQException,
120 SerializationException, InterruptedException, URISyntaxException
121 {
122 ZContext zContext = new ZContext(5);
123 networkXML = new String(Files.readAllBytes(Paths.get(ResourceResolver.resolve("/resources/network.xml").asUri())));
124
125 List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
126 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
127 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
128 readMessageThread.start();
129
130 PublisherThread publisherThread = new PublisherThread(zContext);
131 publisherThread.start();
132
133 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
134 publisherControlSocket.connect("inproc://publisherControl");
135
136 int conversationId = 100;
137 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
138 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
139 waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
140
141 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
142 ++conversationId, new Object[] {Duration.ofSI(10.0)}));
143 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
144
145 badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
146 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
147 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
148 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
149
150 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
151 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
152 waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
153
154
155 sendCommand(publisherControlSocket,
156 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
157 waitForReceivedMessages(receivedMessages, 1);
158 assertEquals(1, receivedMessages.size(), "Should have received one message");
159 Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
160 assertTrue(commands.length > 8, "message decodes into more than 8 fields");
161 for (int index = 8; index < commands.length; index++)
162 {
163 receivedMessages.clear();
164 assertTrue(commands[index] instanceof String, "A service is identified by a String");
165 String service = (String) commands[index];
166 System.out.println("Service " + service);
167 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
168 service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
169 waitForReceivedMessages(receivedMessages, 1.0);
170 if (receivedMessages.size() > 0)
171 {
172 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
173 assertTrue(result.length >= 8, "result of GET_COMMANDS should be at least 8 long");
174 for (int i = 8; i < result.length; i++)
175 {
176 String command = (String) result[i];
177 receivedMessages.clear();
178
179 sendCommand(publisherControlSocket,
180 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
181 waitForReceivedMessages(receivedMessages, 1.0);
182 if (receivedMessages.size() > 0)
183 {
184 for (int ii = 8; ii < receivedMessages.size(); ii++)
185 {
186 System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
187 }
188 }
189 else
190 {
191 System.out.println("Received no reply");
192 }
193 System.out.print("");
194 }
195 }
196 else
197 {
198 System.out.println("Received no reply to GET_COMMANDS request");
199 }
200 }
201 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
202 ++conversationId, "2", "BAD"));
203 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
204 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
205 sendCommand(publisherControlSocket,
206 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
207
208 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
209 "Bad address: Address for GTU Id has wrong length");
210 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
211 ++conversationId, "NON EXISTING GTU ID"));
212 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
213 sendCommand(publisherControlSocket,
214 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
215 sendCommand(publisherControlSocket,
216 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
217 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
218 ++conversationId, new Object[] {Duration.ofSI(10.0)}));
219 sendCommand(publisherControlSocket,
220 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
221 int conversationIdForSubscribeToAdd = ++conversationId;
222 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
223 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
224 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
225 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
226 "Subscription created");
227 int conversationIdForGTU2Move = ++conversationId;
228 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
229 conversationIdForGTU2Move, "2"));
230 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
231 ++conversationId, new Object[] {Duration.ofSI(20.0)}));
232 sendCommand(publisherControlSocket,
233 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
234 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
235
236 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
237 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
238 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
239 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
240 "Subscription removed");
241 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
242 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
243 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
244 ++conversationId, new Object[] {Duration.ofSI(30.0)}));
245 sendCommand(publisherControlSocket,
246 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
247 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
248 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
249 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
250 ++conversationId, new Object[] {Duration.ofSI(60.0)}));
251 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
252 ++conversationId, new Object[] {Duration.ofSI(70.0)}));
253 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
254 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
255 "Simulation is already at end of simulation time");
256
257 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
258 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
259 System.out.println("Master joining publisher thread (this should block until publisher has died)");
260 publisherThread.join();
261 System.out.println("Master has joined publisher thread");
262 System.out.println("Master interrupts read message thread");
263 readMessageThread.interrupt();
264 System.out.println("Master has interrupted read message thread; joining ...");
265 readMessageThread.join();
266 System.out.println("Master has joined read message thread");
267 System.out.println("Master exits");
268 }
269
270
271
272
273
274
275
276
277
278
279 public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
280 final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
281 {
282 for (int attempt = 0; attempt < 1000; attempt++)
283 {
284 waitForReceivedMessages(receivedMessages, 1.0);
285
286 while (receivedMessages.size() > 1)
287 {
288 receivedMessages.remove(0);
289 }
290 if (receivedMessages.size() == 1)
291 {
292 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
293 if (objects[6].equals(conversationId))
294 {
295 break;
296 }
297 receivedMessages.remove(0);
298 }
299 }
300
301 }
302
303
304
305
306
307
308
309 static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
310 throws InterruptedException
311 {
312 double timeWaited = 0;
313 while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
314 {
315 Thread.sleep(10);
316 timeWaited += 0.01;
317 }
318 }
319
320
321
322
323
324
325 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
326 {
327
328
329
330
331
332
333
334
335
336 socket.send(message);
337 }
338
339
340
341
342 static class ReadMessageThread extends Thread
343 {
344
345 private final ZContext zContext;
346
347
348 private final List<byte[]> storage;
349
350
351
352
353
354
355 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
356 {
357 this.zContext = zContext;
358 this.storage = storage;
359 }
360
361 @Override
362 public void run()
363 {
364 System.out.println("Read message thread starting up");
365 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
366 socket.setReceiveTimeOut(100);
367 socket.bind("inproc://publisherOutput");
368 while (!Thread.interrupted())
369 {
370 byte[][] all = readMessages(socket);
371 for (byte[] one : all)
372 {
373 this.storage.add(one);
374 }
375 }
376 System.out.println("Read message thread exits due to interrupt");
377 }
378
379 }
380
381
382
383
384
385
386 public static byte[][] readMessages(final ZMQ.Socket socket)
387 {
388 List<byte[]> resultList = new ArrayList<>();
389 while (true)
390 {
391 byte[] message = socket.recv();
392 StringBuilder output = new StringBuilder();
393 if (null != message)
394 {
395 output.append("Master received " + message.length + " byte message: ");
396
397 try
398 {
399 Object[] fields = Sim0MQMessage.decodeToArray(message);
400 for (Object field : fields)
401 {
402 output.append("/" + field);
403 }
404 output.append("/");
405 }
406 catch (Sim0MQException | SerializationException e)
407 {
408 e.printStackTrace();
409 }
410 System.out.println(output);
411 resultList.add(message);
412 }
413 else
414 {
415 if (resultList.size() > 0)
416 {
417 System.out.println(
418 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
419 }
420 break;
421 }
422 }
423 return resultList.toArray(new byte[resultList.size()][]);
424 }
425
426
427
428
429 static class PublisherThread extends Thread
430 {
431
432 private final ZContext zContext;
433
434
435
436
437
438 PublisherThread(final ZContext zContext)
439 {
440 this.zContext = zContext;
441 }
442
443
444
445
446 PublisherThread()
447 {
448 this.zContext = new ZContext(5);
449 }
450
451 @Override
452 public void run()
453 {
454 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
455 System.out.println("Publisher thread exits");
456 }
457
458 }
459
460
461 private static String networkXML;
462
463 }