1 package org.opentrafficsim.sim0mq.swing;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertTrue;
5 import static org.junit.Assert.fail;
6
7 import java.io.IOException;
8 import java.net.URISyntaxException;
9 import java.nio.file.Files;
10 import java.nio.file.Paths;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14
15 import javax.naming.NamingException;
16
17 import org.djunits.unit.DurationUnit;
18 import org.djunits.unit.TimeUnit;
19 import org.djunits.value.vdouble.scalar.Duration;
20 import org.djunits.value.vdouble.scalar.Time;
21 import org.djutils.io.URLResource;
22 import org.djutils.serialization.SerializationException;
23 import org.junit.Test;
24 import org.opentrafficsim.draw.core.OTSDrawingException;
25 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
26 import org.sim0mq.Sim0MQException;
27 import org.sim0mq.message.Sim0MQMessage;
28 import org.zeromq.SocketType;
29 import org.zeromq.ZContext;
30 import org.zeromq.ZMQ;
31
32 import nl.tudelft.simulation.dsol.SimRuntimeException;
33 import nl.tudelft.simulation.language.DSOLException;
34
35
36
37
38
39
40
41
42
43
44 public class Sim0MQPublisherTest
45 {
46
47
48
49
50
51
52
53
54
55
56
57 public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
58 final String expectedDescription) throws Sim0MQException, SerializationException
59 {
60 Object[] objects = Sim0MQMessage.decodeToArray(got);
61 assertEquals("Field 5 of message echos the command", field5, objects[5]);
62 assertEquals("conversation id (field 6) matches", field6, objects[6]);
63 assertEquals("Response has 2 field payload", 10, objects.length);
64 assertTrue("First payload field is a boolean", objects[8] instanceof Boolean);
65 assertEquals("First payload field has the expected value", expectedValue, objects[8]);
66 assertTrue("Second (and last) payload field is a String", objects[9] instanceof String);
67 if (!((String) objects[9]).startsWith(expectedDescription))
68 {
69 fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
70 + objects[9] + "\"");
71 }
72 }
73
74
75
76
77
78
79
80
81
82
83
84
85
86 public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
87 final int field6, final Boolean expectedValue, final String expectedDescription)
88 throws Sim0MQException, SerializationException, InterruptedException
89 {
90 waitForReceivedMessages(receivedMessages, maximumSeconds);
91 assertEquals("Should have received one message", 1, receivedMessages.size());
92 verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
93 receivedMessages.clear();
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DSOLException,
110 OTSDrawingException, Sim0MQException, SerializationException, InterruptedException, URISyntaxException
111 {
112 ZContext zContext = new ZContext(5);
113 networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/resources/network.xml").toURI())));
114
115 List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
116 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
117 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
118 readMessageThread.start();
119
120 PublisherThread publisherThread = new PublisherThread(zContext);
121 publisherThread.start();
122
123 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
124 publisherControlSocket.connect("inproc://publisherControl");
125
126 int conversationId = 100;
127 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
128 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
129 waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
130
131 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
132 ++conversationId, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
133 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
134
135 badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
136 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
137 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
138 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
139
140 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
141 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
142 waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
143
144
145 sendCommand(publisherControlSocket,
146 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
147 waitForReceivedMessages(receivedMessages, 1);
148 assertEquals("Should have received one message", 1, receivedMessages.size());
149 Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
150 assertTrue("message decodes into more than 8 fields", commands.length > 8);
151 for (int index = 8; index < commands.length; index++)
152 {
153 receivedMessages.clear();
154 assertTrue("A service is identified by a String", commands[index] instanceof String);
155 String service = (String) commands[index];
156 System.out.println("Service " + service);
157 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
158 service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
159 waitForReceivedMessages(receivedMessages, 1.0);
160 if (receivedMessages.size() > 0)
161 {
162 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
163 assertTrue("result of GET_COMMANDS should be at least 8 long", result.length >= 8);
164 for (int i = 8; i < result.length; i++)
165 {
166 String command = (String) result[i];
167 receivedMessages.clear();
168
169 sendCommand(publisherControlSocket,
170 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
171 waitForReceivedMessages(receivedMessages, 1.0);
172 if (receivedMessages.size() > 0)
173 {
174 for (int ii = 8; ii < receivedMessages.size(); ii++)
175 {
176 System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
177 }
178 }
179 else
180 {
181 System.out.println("Received no reply");
182 }
183 System.out.print("");
184 }
185 }
186 else
187 {
188 System.out.println("Received no reply to GET_COMMANDS request");
189 }
190 }
191 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
192 ++conversationId, "2", "BAD"));
193 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
194 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
195 sendCommand(publisherControlSocket,
196 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
197
198 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
199 "Bad address: Address for GTU Id has wrong length");
200 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
201 ++conversationId, "NON EXISTING GTU ID"));
202 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
203 sendCommand(publisherControlSocket,
204 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
205 sendCommand(publisherControlSocket,
206 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
207 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
208 ++conversationId, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
209 sendCommand(publisherControlSocket,
210 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
211 int conversationIdForSubscribeToAdd = ++conversationId;
212 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
213 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
214 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
215 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
216 "Subscription created");
217 int conversationIdForGTU2Move = ++conversationId;
218 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
219 conversationIdForGTU2Move, "2"));
220 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
221 ++conversationId, new Object[] { new Time(20, TimeUnit.BASE_SECOND) }));
222 sendCommand(publisherControlSocket,
223 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
224 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
225
226 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
227 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
228 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
229 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
230 "Subscription removed");
231 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
232 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
233 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
234 ++conversationId, new Object[] { new Time(30, TimeUnit.BASE_SECOND) }));
235 sendCommand(publisherControlSocket,
236 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
237 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
238 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
239 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
240 ++conversationId, new Object[] { new Time(60, TimeUnit.BASE_SECOND) }));
241 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
242 ++conversationId, new Object[] { new Time(70, TimeUnit.BASE_SECOND) }));
243 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
244 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
245 "Simulation is already at end of simulation time");
246
247 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
248 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
249 System.out.println("Master joining publisher thread (this should block until publisher has died)");
250 publisherThread.join();
251 System.out.println("Master has joined publisher thread");
252 System.out.println("Master interrupts read message thread");
253 readMessageThread.interrupt();
254 System.out.println("Master has interrupted read message thread; joining ...");
255 readMessageThread.join();
256 System.out.println("Master has joined read message thread");
257 System.out.println("Master exits");
258 }
259
260
261
262
263
264
265
266
267
268
269 public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
270 final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
271 {
272 for (int attempt = 0; attempt < 1000; attempt++)
273 {
274 waitForReceivedMessages(receivedMessages, 1.0);
275
276 while (receivedMessages.size() > 1)
277 {
278 receivedMessages.remove(0);
279 }
280 if (receivedMessages.size() == 1)
281 {
282 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
283 if (objects[6].equals(conversationId))
284 {
285 break;
286 }
287 receivedMessages.remove(0);
288 }
289 }
290
291 }
292
293
294
295
296
297
298
299 static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
300 throws InterruptedException
301 {
302 double timeWaited = 0;
303 while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
304 {
305 Thread.sleep(10);
306 timeWaited += 0.01;
307 }
308 }
309
310
311
312
313
314
315 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
316 {
317
318
319
320
321
322
323
324
325
326 socket.send(message);
327 }
328
329
330
331
332 static class ReadMessageThread extends Thread
333 {
334
335 private final ZContext zContext;
336
337
338 private final List<byte[]> storage;
339
340
341
342
343
344
345 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
346 {
347 this.zContext = zContext;
348 this.storage = storage;
349 }
350
351 @Override
352 public void run()
353 {
354 System.out.println("Read message thread starting up");
355 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
356 socket.setReceiveTimeOut(100);
357 socket.bind("inproc://publisherOutput");
358 while (!Thread.interrupted())
359 {
360 byte[][] all = readMessages(socket);
361 for (byte[] one : all)
362 {
363 this.storage.add(one);
364 }
365 }
366 System.out.println("Read message thread exits due to interrupt");
367 }
368
369 }
370
371
372
373
374
375
376 public static byte[][] readMessages(final ZMQ.Socket socket)
377 {
378 List<byte[]> resultList = new ArrayList<>();
379 while (true)
380 {
381 byte[] message = socket.recv();
382 StringBuilder output = new StringBuilder();
383 if (null != message)
384 {
385 output.append("Master received " + message.length + " byte message: ");
386
387 try
388 {
389 Object[] fields = Sim0MQMessage.decodeToArray(message);
390 for (Object field : fields)
391 {
392 output.append("/" + field);
393 }
394 output.append("/");
395 }
396 catch (Sim0MQException | SerializationException e)
397 {
398 e.printStackTrace();
399 }
400 System.out.println(output);
401 resultList.add(message);
402 }
403 else
404 {
405 if (resultList.size() > 0)
406 {
407 System.out.println(
408 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
409 }
410 break;
411 }
412 }
413 return resultList.toArray(new byte[resultList.size()][]);
414 }
415
416
417
418
419 static class PublisherThread extends Thread
420 {
421
422 private final ZContext zContext;
423
424
425
426
427
428 PublisherThread(final ZContext zContext)
429 {
430 this.zContext = zContext;
431 }
432
433
434
435
436 PublisherThread()
437 {
438 this.zContext = new ZContext(5);
439 }
440
441 @Override
442 public void run()
443 {
444 new Sim0MQPublisher(this.zContext, "publisherControl", "publisherOutput");
445 System.out.println("Publisher thread exits");
446 }
447
448 }
449
450
451 private static String networkXML;
452
453 }