1 package org.opentrafficsim.sim0mq.swing;
2
3 import static org.junit.jupiter.api.Assertions.assertEquals;
4 import static org.junit.jupiter.api.Assertions.assertTrue;
5 import static org.junit.jupiter.api.Assertions.fail;
6
7 import java.io.IOException;
8 import java.net.URISyntaxException;
9 import java.nio.file.Files;
10 import java.nio.file.Paths;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14
15 import javax.naming.NamingException;
16
17 import org.djunits.unit.DurationUnit;
18 import org.djunits.unit.TimeUnit;
19 import org.djunits.value.vdouble.scalar.Duration;
20 import org.djunits.value.vdouble.scalar.Time;
21 import org.djutils.io.URLResource;
22 import org.djutils.serialization.SerializationException;
23 import org.junit.jupiter.api.Test;
24 import org.opentrafficsim.sim0mq.publisher.SubscriptionHandler;
25 import org.sim0mq.Sim0MQException;
26 import org.sim0mq.message.Sim0MQMessage;
27 import org.zeromq.SocketType;
28 import org.zeromq.ZContext;
29 import org.zeromq.ZMQ;
30
31 import nl.tudelft.simulation.dsol.SimRuntimeException;
32 import nl.tudelft.simulation.language.DsolException;
33
34
35
36
37
38
39
40
41
42 public class Sim0MQPublisherTest
43 {
44
45
46
47
48 @Test
49 public void dummyTest()
50 {
51
52 }
53
54
55
56
57
58
59
60
61
62
63
64 public void verifyAckNack(final byte[] got, final String field5, final int field6, final Boolean expectedValue,
65 final String expectedDescription) throws Sim0MQException, SerializationException
66 {
67 Object[] objects = Sim0MQMessage.decodeToArray(got);
68 assertEquals(field5, objects[5], "Field 5 of message echos the command");
69 assertEquals(field6, objects[6], "conversation id (field 6) matches");
70 assertEquals(10, objects.length, "Response has 2 field payload");
71 assertTrue(objects[8] instanceof Boolean, "First payload field is a boolean");
72 assertEquals(expectedValue, objects[8], "First payload field has the expected value");
73 assertTrue(objects[9] instanceof String, "Second (and last) payload field is a String");
74 if (!((String) objects[9]).startsWith(expectedDescription))
75 {
76 fail("Description of ACK/NACK does not start with \"" + expectedDescription + "\" instead it contains \""
77 + objects[9] + "\"");
78 }
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public void waitAndVerifyAckNack(final List<byte[]> receivedMessages, final double maximumSeconds, final String field5,
94 final int field6, final Boolean expectedValue, final String expectedDescription)
95 throws Sim0MQException, SerializationException, InterruptedException
96 {
97 waitForReceivedMessages(receivedMessages, maximumSeconds);
98 assertEquals(1, receivedMessages.size(), "Should have received one message");
99 verifyAckNack(receivedMessages.get(0), field5, field6, expectedValue, expectedDescription);
100 receivedMessages.clear();
101 }
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public void testSim0MQPublisher() throws IOException, SimRuntimeException, NamingException, DsolException, Sim0MQException,
116 SerializationException, InterruptedException, URISyntaxException
117 {
118 ZContext zContext = new ZContext(5);
119 networkXML = new String(Files.readAllBytes(Paths.get(URLResource.getResource("/resources/network.xml").toURI())));
120
121 List<byte[]> receivedMessages = Collections.synchronizedList(new ArrayList<>());
122 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
123 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
124 readMessageThread.start();
125
126 PublisherThread publisherThread = new PublisherThread(zContext);
127 publisherThread.start();
128
129 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
130 publisherControlSocket.connect("inproc://publisherControl");
131
132 int conversationId = 100;
133 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
134 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
135 waitAndVerifyAckNack(receivedMessages, 1.0, badCommand, conversationId, false, "Don't know how to handle message:");
136
137 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
138 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
139 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false, "No network loaded");
140
141 badCommand = "GTUs in network|SUBSCRIBE_TO_ADD";
142 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, ++conversationId));
143 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationId, false,
144 "No simulation loaded; cannot execute command GTUs in network|SUBSCRIBE_TO_ADD");
145
146 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
147 ++conversationId, networkXML, new Duration(60, DurationUnit.SECOND), Duration.ZERO, 123456L));
148 waitAndVerifyAckNack(receivedMessages, 10.0, "NEWSIMULATION", conversationId, true, "OK");
149
150
151 sendCommand(publisherControlSocket,
152 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_LIST", ++conversationId));
153 waitForReceivedMessages(receivedMessages, 1);
154 assertEquals(1, receivedMessages.size(), "Should have received one message");
155 Object[] commands = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
156 assertTrue(commands.length > 8, "message decodes into more than 8 fields");
157 for (int index = 8; index < commands.length; index++)
158 {
159 receivedMessages.clear();
160 assertTrue(commands[index] instanceof String, "A service is identified by a String");
161 String service = (String) commands[index];
162 System.out.println("Service " + service);
163 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
164 service + "|" + SubscriptionHandler.Command.GET_COMMANDS, ++conversationId));
165 waitForReceivedMessages(receivedMessages, 1.0);
166 if (receivedMessages.size() > 0)
167 {
168 Object[] result = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
169 assertTrue(result.length >= 8, "result of GET_COMMANDS should be at least 8 long");
170 for (int i = 8; i < result.length; i++)
171 {
172 String command = (String) result[i];
173 receivedMessages.clear();
174
175 sendCommand(publisherControlSocket,
176 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", service + "|" + command, ++conversationId));
177 waitForReceivedMessages(receivedMessages, 1.0);
178 if (receivedMessages.size() > 0)
179 {
180 for (int ii = 8; ii < receivedMessages.size(); ii++)
181 {
182 System.out.println(Sim0MQMessage.print(Sim0MQMessage.decodeToArray(receivedMessages.get(ii))));
183 }
184 }
185 else
186 {
187 System.out.println("Received no reply");
188 }
189 System.out.print("");
190 }
191 }
192 else
193 {
194 System.out.println("Received no reply to GET_COMMANDS request");
195 }
196 }
197 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
198 ++conversationId, "2", "BAD"));
199 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
200 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "Bad address");
201 sendCommand(publisherControlSocket,
202 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE", ++conversationId));
203
204 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false,
205 "Bad address: Address for GTU Id has wrong length");
206 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
207 ++conversationId, "NON EXISTING GTU ID"));
208 waitAndVerifyAckNack(receivedMessages, 1.0, "GTU move", conversationId, false, "No GTU with id");
209 sendCommand(publisherControlSocket,
210 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
211 sendCommand(publisherControlSocket,
212 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|SUBSCRIBE_TO_ADD", ++conversationId));
213 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
214 ++conversationId, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
215 sendCommand(publisherControlSocket,
216 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
217 int conversationIdForSubscribeToAdd = ++conversationId;
218 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
219 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
220 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
221 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
222 "Subscription created");
223 int conversationIdForGTU2Move = ++conversationId;
224 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
225 conversationIdForGTU2Move, "2"));
226 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
227 ++conversationId, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
228 sendCommand(publisherControlSocket,
229 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
230 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
231
232 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
233 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
234 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationIdForSubscribeToAdd);
235 waitAndVerifyAckNack(receivedMessages, 1.0, "GTUs in network", conversationIdForSubscribeToAdd, true,
236 "Subscription removed");
237 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
238 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
239 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
240 ++conversationId, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
241 sendCommand(publisherControlSocket,
242 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", ++conversationId));
243 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
244 "GTUs in network|GET_ADDRESS_META_DATA", ++conversationId));
245 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
246 ++conversationId, new Object[] {new Time(60, TimeUnit.BASE_SECOND)}));
247 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
248 ++conversationId, new Object[] {new Time(70, TimeUnit.BASE_SECOND)}));
249 waitAndEatMessagesUntilConversationId(receivedMessages, 1.0, conversationId);
250 waitAndVerifyAckNack(receivedMessages, 1.0, "SIMULATEUNTIL", conversationId, false,
251 "Simulation is already at end of simulation time");
252
253 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", ++conversationId));
254 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
255 System.out.println("Master joining publisher thread (this should block until publisher has died)");
256 publisherThread.join();
257 System.out.println("Master has joined publisher thread");
258 System.out.println("Master interrupts read message thread");
259 readMessageThread.interrupt();
260 System.out.println("Master has interrupted read message thread; joining ...");
261 readMessageThread.join();
262 System.out.println("Master has joined read message thread");
263 System.out.println("Master exits");
264 }
265
266
267
268
269
270
271
272
273
274
275 public void waitAndEatMessagesUntilConversationId(final List<byte[]> receivedMessages, final double maximumSeconds,
276 final int conversationId) throws Sim0MQException, SerializationException, InterruptedException
277 {
278 for (int attempt = 0; attempt < 1000; attempt++)
279 {
280 waitForReceivedMessages(receivedMessages, 1.0);
281
282 while (receivedMessages.size() > 1)
283 {
284 receivedMessages.remove(0);
285 }
286 if (receivedMessages.size() == 1)
287 {
288 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
289 if (objects[6].equals(conversationId))
290 {
291 break;
292 }
293 receivedMessages.remove(0);
294 }
295 }
296
297 }
298
299
300
301
302
303
304
305 static void waitForReceivedMessages(final List<byte[]> receivedMessages, final double maximumSeconds)
306 throws InterruptedException
307 {
308 double timeWaited = 0;
309 while (receivedMessages.size() == 0 && timeWaited < maximumSeconds)
310 {
311 Thread.sleep(10);
312 timeWaited += 0.01;
313 }
314 }
315
316
317
318
319
320
321 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
322 {
323
324
325
326
327
328
329
330
331
332 socket.send(message);
333 }
334
335
336
337
338 static class ReadMessageThread extends Thread
339 {
340
341 private final ZContext zContext;
342
343
344 private final List<byte[]> storage;
345
346
347
348
349
350
351 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
352 {
353 this.zContext = zContext;
354 this.storage = storage;
355 }
356
357 @Override
358 public void run()
359 {
360 System.out.println("Read message thread starting up");
361 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
362 socket.setReceiveTimeOut(100);
363 socket.bind("inproc://publisherOutput");
364 while (!Thread.interrupted())
365 {
366 byte[][] all = readMessages(socket);
367 for (byte[] one : all)
368 {
369 this.storage.add(one);
370 }
371 }
372 System.out.println("Read message thread exits due to interrupt");
373 }
374
375 }
376
377
378
379
380
381
382 public static byte[][] readMessages(final ZMQ.Socket socket)
383 {
384 List<byte[]> resultList = new ArrayList<>();
385 while (true)
386 {
387 byte[] message = socket.recv();
388 StringBuilder output = new StringBuilder();
389 if (null != message)
390 {
391 output.append("Master received " + message.length + " byte message: ");
392
393 try
394 {
395 Object[] fields = Sim0MQMessage.decodeToArray(message);
396 for (Object field : fields)
397 {
398 output.append("/" + field);
399 }
400 output.append("/");
401 }
402 catch (Sim0MQException | SerializationException e)
403 {
404 e.printStackTrace();
405 }
406 System.out.println(output);
407 resultList.add(message);
408 }
409 else
410 {
411 if (resultList.size() > 0)
412 {
413 System.out.println(
414 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
415 }
416 break;
417 }
418 }
419 return resultList.toArray(new byte[resultList.size()][]);
420 }
421
422
423
424
425 static class PublisherThread extends Thread
426 {
427
428 private final ZContext zContext;
429
430
431
432
433
434 PublisherThread(final ZContext zContext)
435 {
436 this.zContext = zContext;
437 }
438
439
440
441
442 PublisherThread()
443 {
444 this.zContext = new ZContext(5);
445 }
446
447 @Override
448 public void run()
449 {
450 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
451 System.out.println("Publisher thread exits");
452 }
453
454 }
455
456
457 private static String networkXML;
458
459 }