1 package org.opentrafficsim.sim0mq.swing;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertTrue;
5 import static org.junit.Assert.fail;
6
7 import java.io.IOException;
8 import java.net.URISyntaxException;
9 import java.nio.file.Files;
10 import java.nio.file.Paths;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14
15 import javax.naming.NamingException;
16
17 import org.djunits.unit.DurationUnit;
18 import org.djunits.unit.TimeUnit;
19 import org.djunits.value.vdouble.scalar.Duration;
20 import org.djunits.value.vdouble.scalar.Time;
21 import org.djutils.io.URLResource;
22 import org.djutils.serialization.SerializationException;
23 import org.junit.Test;
24 import org.opentrafficsim.draw.core.OtsDrawingException;
25 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
26 import org.sim0mq.Sim0MQException;
27 import org.sim0mq.message.Sim0MQMessage;
28 import org.zeromq.SocketType;
29 import org.zeromq.ZContext;
30 import org.zeromq.ZMQ;
31
32 import nl.tudelft.simulation.dsol.SimRuntimeException;
33 import nl.tudelft.simulation.language.DSOLException;
34
35
36
37
38
39
40
41
42
43 public class Sim0MQPublisherTest
44 {
45
46
47
48
49
50
51
52
53
54
55
56 public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
57 final String expectedDescription) throws Sim0MQException, SerializationException
58 {
59 Object[] objects = Sim0MQMessage.decodeToArray(got);
60 assertEquals("Field 5 of message echos the command", field5, objects[5]);
61 assertEquals("conversation id (field 6) matches", field6, objects[6]);
62 assertEquals("Response has 2 field payload", 10, objects.length);
63 assertTrue("First payload field is a boolean", objects[8] instanceof Boolean);
64 assertEquals("First payload field has the expected value", expectedValue, objects[8]);
65 assertTrue("Second (and last) payload field is a String", objects[9] instanceof String);
66 if (!((String) objects[9]).startsWith(expectedDescription))
67 {
68 fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
69 + objects[9] + "\"");
70 }
71 }
72
73
74
75
76
77
78
79
80
81
82
83
84
85 public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
86 final int field6, final Boolean expectedValue, final String expectedDescription)
87 throws Sim0MQException, SerializationException, InterruptedException
88 {
89 waitForReceivedMessages(receivedMessages, maximumSeconds);
90 assertEquals("Should have received one message", 1, receivedMessages.size());
91 verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
92 receivedMessages.clear();
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DSOLException,
109 OtsDrawingException, Sim0MQException, SerializationException, InterruptedException, URISyntaxException
110 {
111 ZContext zContext = new ZContext(5);
112 networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/resources/network.xml").toURI())));
113
114 List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
115 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
116 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
117 readMessageThread.start();
118
119 PublisherThread publisherThread = new PublisherThread(zContext);
120 publisherThread.start();
121
122 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
123 publisherControlSocket.connect("inproc://publisherControl");
124
125 int conversationId = 100;
126 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
127 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
128 waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
129
130 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
131 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
132 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
133
134 badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
135 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
136 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
137 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
138
139 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
140 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
141 waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
142
143
144 sendCommand(publisherControlSocket,
145 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
146 waitForReceivedMessages(receivedMessages, 1);
147 assertEquals("Should have received one message", 1, receivedMessages.size());
148 Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
149 assertTrue("message decodes into more than 8 fields", commands.length > 8);
150 for (int index = 8; index < commands.length; index++)
151 {
152 receivedMessages.clear();
153 assertTrue("A service is identified by a String", commands[index] instanceof String);
154 String service = (String) commands[index];
155 System.out.println("Service " + service);
156 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
157 service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
158 waitForReceivedMessages(receivedMessages, 1.0);
159 if (receivedMessages.size() > 0)
160 {
161 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
162 assertTrue("result of GET_COMMANDS should be at least 8 long", result.length >= 8);
163 for (int i = 8; i < result.length; i++)
164 {
165 String command = (String) result[i];
166 receivedMessages.clear();
167
168 sendCommand(publisherControlSocket,
169 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
170 waitForReceivedMessages(receivedMessages, 1.0);
171 if (receivedMessages.size() > 0)
172 {
173 for (int ii = 8; ii < receivedMessages.size(); ii++)
174 {
175 System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
176 }
177 }
178 else
179 {
180 System.out.println("Received no reply");
181 }
182 System.out.print("");
183 }
184 }
185 else
186 {
187 System.out.println("Received no reply to GET_COMMANDS request");
188 }
189 }
190 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
191 ++conversationId, "2", "BAD"));
192 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
193 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
194 sendCommand(publisherControlSocket,
195 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
196
197 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
198 "Bad address: Address for GTU Id has wrong length");
199 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
200 ++conversationId, "NON EXISTING GTU ID"));
201 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
202 sendCommand(publisherControlSocket,
203 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
204 sendCommand(publisherControlSocket,
205 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
206 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
207 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
208 sendCommand(publisherControlSocket,
209 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
210 int conversationIdForSubscribeToAdd = ++conversationId;
211 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
212 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
213 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
214 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
215 "Subscription created");
216 int conversationIdForGTU2Move = ++conversationId;
217 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
218 conversationIdForGTU2Move, "2"));
219 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
220 ++conversationId, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
221 sendCommand(publisherControlSocket,
222 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
223 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
224
225 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
226 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
227 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
228 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
229 "Subscription removed");
230 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
231 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
232 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
233 ++conversationId, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
234 sendCommand(publisherControlSocket,
235 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
236 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
237 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
238 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
239 ++conversationId, new Object[] {new Time(60, TimeUnit.BASE_SECOND)}));
240 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
241 ++conversationId, new Object[] {new Time(70, TimeUnit.BASE_SECOND)}));
242 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
243 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
244 "Simulation is already at end of simulation time");
245
246 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
247 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
248 System.out.println("Master joining publisher thread (this should block until publisher has died)");
249 publisherThread.join();
250 System.out.println("Master has joined publisher thread");
251 System.out.println("Master interrupts read message thread");
252 readMessageThread.interrupt();
253 System.out.println("Master has interrupted read message thread; joining ...");
254 readMessageThread.join();
255 System.out.println("Master has joined read message thread");
256 System.out.println("Master exits");
257 }
258
259
260
261
262
263
264
265
266
267
268 public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
269 final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
270 {
271 for (int attempt = 0; attempt < 1000; attempt++)
272 {
273 waitForReceivedMessages(receivedMessages, 1.0);
274
275 while (receivedMessages.size() > 1)
276 {
277 receivedMessages.remove(0);
278 }
279 if (receivedMessages.size() == 1)
280 {
281 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
282 if (objects[6].equals(conversationId))
283 {
284 break;
285 }
286 receivedMessages.remove(0);
287 }
288 }
289
290 }
291
292
293
294
295
296
297
298 static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
299 throws InterruptedException
300 {
301 double timeWaited = 0;
302 while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
303 {
304 Thread.sleep(10);
305 timeWaited += 0.01;
306 }
307 }
308
309
310
311
312
313
314 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
315 {
316
317
318
319
320
321
322
323
324
325 socket.send(message);
326 }
327
328
329
330
331 static class ReadMessageThread extends Thread
332 {
333
334 private final ZContext zContext;
335
336
337 private final List<byte[]> storage;
338
339
340
341
342
343
344 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
345 {
346 this.zContext = zContext;
347 this.storage = storage;
348 }
349
350 @Override
351 public void run()
352 {
353 System.out.println("Read message thread starting up");
354 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
355 socket.setReceiveTimeOut(100);
356 socket.bind("inproc://publisherOutput");
357 while (!Thread.interrupted())
358 {
359 byte[][] all = readMessages(socket);
360 for (byte[] one : all)
361 {
362 this.storage.add(one);
363 }
364 }
365 System.out.println("Read message thread exits due to interrupt");
366 }
367
368 }
369
370
371
372
373
374
375 public static byte[][] readMessages(final ZMQ.Socket socket)
376 {
377 List<byte[]> resultList = new ArrayList<>();
378 while (true)
379 {
380 byte[] message = socket.recv();
381 StringBuilder output = new StringBuilder();
382 if (null != message)
383 {
384 output.append("Master received " + message.length + " byte message: ");
385
386 try
387 {
388 Object[] fields = Sim0MQMessage.decodeToArray(message);
389 for (Object field : fields)
390 {
391 output.append("/" + field);
392 }
393 output.append("/");
394 }
395 catch (Sim0MQException | SerializationException e)
396 {
397 e.printStackTrace();
398 }
399 System.out.println(output);
400 resultList.add(message);
401 }
402 else
403 {
404 if (resultList.size() > 0)
405 {
406 System.out.println(
407 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
408 }
409 break;
410 }
411 }
412 return resultList.toArray(new byte[resultList.size()][]);
413 }
414
415
416
417
418 static class PublisherThread extends Thread
419 {
420
421 private final ZContext zContext;
422
423
424
425
426
427 PublisherThread(final ZContext zContext)
428 {
429 this.zContext = zContext;
430 }
431
432
433
434
435 PublisherThread()
436 {
437 this.zContext = new ZContext(5);
438 }
439
440 @Override
441 public void run()
442 {
443 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
444 System.out.println("Publisher thread exits");
445 }
446
447 }
448
449
450 private static String networkXML;
451
452 }