1 package org.opentrafficsim.sim0mq.swing;
2
3 import static org.junit.jupiter.api.Assertions.assertEquals;
4 import static org.junit.jupiter.api.Assertions.assertTrue;
5 import static org.junit.jupiter.api.Assertions.fail;
6
7 import java.io.IOException;
8 import java.net.URISyntaxException;
9 import java.nio.file.Files;
10 import java.nio.file.Paths;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14
15 import javax.naming.NamingException;
16
17 import org.djunits.unit.DurationUnit;
18 import org.djunits.unit.TimeUnit;
19 import org.djunits.value.vdouble.scalar.Duration;
20 import org.djunits.value.vdouble.scalar.Time;
21 import org.djutils.io.URLResource;
22 import org.djutils.serialization.SerializationException;
23 import org.opentrafficsim.draw.OtsDrawingException;
24 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
25 import org.sim0mq.Sim0MQException;
26 import org.sim0mq.message.Sim0MQMessage;
27 import org.zeromq.SocketType;
28 import org.zeromq.ZContext;
29 import org.zeromq.ZMQ;
30
31 import nl.tudelft.simulation.dsol.SimRuntimeException;
32 import nl.tudelft.simulation.language.DsolException;
33
34
35
36
37
38
39
40
41
42 public class Sim0MQPublisherTest
43 {
44
45
46
47
48
49
50
51
52
53
54
55 public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
56 final String expectedDescription) throws Sim0MQException, SerializationException
57 {
58 Object[] objects = Sim0MQMessage.decodeToArray(got);
59 assertEquals(field5, objects[5], "Field 5 of message echos the command");
60 assertEquals(field6, objects[6], "conversation id (field 6) matches");
61 assertEquals(10, objects.length, "Response has 2 field payload");
62 assertTrue(objects[8] instanceof Boolean, "First payload field is a boolean");
63 assertEquals(expectedValue, objects[8], "First payload field has the expected value");
64 assertTrue(objects[9] instanceof String, "Second (and last) payload field is a String");
65 if (!((String) objects[9]).startsWith(expectedDescription))
66 {
67 fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
68 + objects[9] + "\"");
69 }
70 }
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
85 final int field6, final Boolean expectedValue, final String expectedDescription)
86 throws Sim0MQException, SerializationException, InterruptedException
87 {
88 waitForReceivedMessages(receivedMessages, maximumSeconds);
89 assertEquals(1, receivedMessages.size(), "Should have received one message");
90 verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
91 receivedMessages.clear();
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DsolException,
108 OtsDrawingException, Sim0MQException, SerializationException, InterruptedException, URISyntaxException
109 {
110 ZContext zContext = new ZContext(5);
111 networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/resources/network.xml").toURI())));
112
113 List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
114 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
115 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
116 readMessageThread.start();
117
118 PublisherThread publisherThread = new PublisherThread(zContext);
119 publisherThread.start();
120
121 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
122 publisherControlSocket.connect("inproc://publisherControl");
123
124 int conversationId = 100;
125 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
126 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
127 waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
128
129 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
130 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
131 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
132
133 badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
134 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
135 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
136 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
137
138 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
139 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
140 waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
141
142
143 sendCommand(publisherControlSocket,
144 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
145 waitForReceivedMessages(receivedMessages, 1);
146 assertEquals(1, receivedMessages.size(), "Should have received one message");
147 Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
148 assertTrue(commands.length > 8, "message decodes into more than 8 fields");
149 for (int index = 8; index < commands.length; index++)
150 {
151 receivedMessages.clear();
152 assertTrue(commands[index] instanceof String, "A service is identified by a String");
153 String service = (String) commands[index];
154 System.out.println("Service " + service);
155 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
156 service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
157 waitForReceivedMessages(receivedMessages, 1.0);
158 if (receivedMessages.size() > 0)
159 {
160 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
161 assertTrue(result.length >= 8, "result of GET_COMMANDS should be at least 8 long");
162 for (int i = 8; i < result.length; i++)
163 {
164 String command = (String) result[i];
165 receivedMessages.clear();
166
167 sendCommand(publisherControlSocket,
168 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
169 waitForReceivedMessages(receivedMessages, 1.0);
170 if (receivedMessages.size() > 0)
171 {
172 for (int ii = 8; ii < receivedMessages.size(); ii++)
173 {
174 System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
175 }
176 }
177 else
178 {
179 System.out.println("Received no reply");
180 }
181 System.out.print("");
182 }
183 }
184 else
185 {
186 System.out.println("Received no reply to GET_COMMANDS request");
187 }
188 }
189 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
190 ++conversationId, "2", "BAD"));
191 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
192 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
193 sendCommand(publisherControlSocket,
194 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
195
196 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
197 "Bad address: Address for GTU Id has wrong length");
198 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
199 ++conversationId, "NON EXISTING GTU ID"));
200 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
201 sendCommand(publisherControlSocket,
202 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
203 sendCommand(publisherControlSocket,
204 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
205 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
206 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
207 sendCommand(publisherControlSocket,
208 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
209 int conversationIdForSubscribeToAdd = ++conversationId;
210 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
211 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
212 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
213 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
214 "Subscription created");
215 int conversationIdForGTU2Move = ++conversationId;
216 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
217 conversationIdForGTU2Move, "2"));
218 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
219 ++conversationId, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
220 sendCommand(publisherControlSocket,
221 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
222 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
223
224 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
225 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
226 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
227 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
228 "Subscription removed");
229 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
230 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
231 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
232 ++conversationId, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
233 sendCommand(publisherControlSocket,
234 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
235 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
236 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
237 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
238 ++conversationId, new Object[] {new Time(60, TimeUnit.BASE_SECOND)}));
239 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
240 ++conversationId, new Object[] {new Time(70, TimeUnit.BASE_SECOND)}));
241 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
242 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
243 "Simulation is already at end of simulation time");
244
245 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
246 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
247 System.out.println("Master joining publisher thread (this should block until publisher has died)");
248 publisherThread.join();
249 System.out.println("Master has joined publisher thread");
250 System.out.println("Master interrupts read message thread");
251 readMessageThread.interrupt();
252 System.out.println("Master has interrupted read message thread; joining ...");
253 readMessageThread.join();
254 System.out.println("Master has joined read message thread");
255 System.out.println("Master exits");
256 }
257
258
259
260
261
262
263
264
265
266
267 public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
268 final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
269 {
270 for (int attempt = 0; attempt < 1000; attempt++)
271 {
272 waitForReceivedMessages(receivedMessages, 1.0);
273
274 while (receivedMessages.size() > 1)
275 {
276 receivedMessages.remove(0);
277 }
278 if (receivedMessages.size() == 1)
279 {
280 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
281 if (objects[6].equals(conversationId))
282 {
283 break;
284 }
285 receivedMessages.remove(0);
286 }
287 }
288
289 }
290
291
292
293
294
295
296
297 static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
298 throws InterruptedException
299 {
300 double timeWaited = 0;
301 while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
302 {
303 Thread.sleep(10);
304 timeWaited += 0.01;
305 }
306 }
307
308
309
310
311
312
313 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
314 {
315
316
317
318
319
320
321
322
323
324 socket.send(message);
325 }
326
327
328
329
330 static class ReadMessageThread extends Thread
331 {
332
333 private final ZContext zContext;
334
335
336 private final List<byte[]> storage;
337
338
339
340
341
342
343 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
344 {
345 this.zContext = zContext;
346 this.storage = storage;
347 }
348
349 @Override
350 public void run()
351 {
352 System.out.println("Read message thread starting up");
353 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
354 socket.setReceiveTimeOut(100);
355 socket.bind("inproc://publisherOutput");
356 while (!Thread.interrupted())
357 {
358 byte[][] all = readMessages(socket);
359 for (byte[] one : all)
360 {
361 this.storage.add(one);
362 }
363 }
364 System.out.println("Read message thread exits due to interrupt");
365 }
366
367 }
368
369
370
371
372
373
374 public static byte[][] readMessages(final ZMQ.Socket socket)
375 {
376 List<byte[]> resultList = new ArrayList<>();
377 while (true)
378 {
379 byte[] message = socket.recv();
380 StringBuilder output = new StringBuilder();
381 if (null != message)
382 {
383 output.append("Master received " + message.length + " byte message: ");
384
385 try
386 {
387 Object[] fields = Sim0MQMessage.decodeToArray(message);
388 for (Object field : fields)
389 {
390 output.append("/" + field);
391 }
392 output.append("/");
393 }
394 catch (Sim0MQException | SerializationException e)
395 {
396 e.printStackTrace();
397 }
398 System.out.println(output);
399 resultList.add(message);
400 }
401 else
402 {
403 if (resultList.size() > 0)
404 {
405 System.out.println(
406 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
407 }
408 break;
409 }
410 }
411 return resultList.toArray(new byte[resultList.size()][]);
412 }
413
414
415
416
417 static class PublisherThread extends Thread
418 {
419
420 private final ZContext zContext;
421
422
423
424
425
426 PublisherThread(final ZContext zContext)
427 {
428 this.zContext = zContext;
429 }
430
431
432
433
434 PublisherThread()
435 {
436 this.zContext = new ZContext(5);
437 }
438
439 @Override
440 public void run()
441 {
442 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
443 System.out.println("Publisher thread exits");
444 }
445
446 }
447
448
449 private static String networkXML;
450
451 }