1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.io.IOException;
4 import java.nio.file.Files;
5 import java.nio.file.Paths;
6 import java.util.ArrayList;
7 import java.util.Collections;
8 import java.util.List;
9
10 import javax.naming.NamingException;
11
12 import org.djunits.unit.DurationUnit;
13 import org.djunits.unit.TimeUnit;
14 import org.djunits.value.vdouble.scalar.Duration;
15 import org.djunits.value.vdouble.scalar.Time;
16 import org.djutils.serialization.SerializationException;
17 import org.opentrafficsim.draw.core.OTSDrawingException;
18 import org.sim0mq.Sim0MQException;
19 import org.sim0mq.message.Sim0MQMessage;
20 import org.zeromq.SocketType;
21 import org.zeromq.ZContext;
22 import org.zeromq.ZMQ;
23
24 import nl.tudelft.simulation.dsol.SimRuntimeException;
25 import nl.tudelft.simulation.language.DSOLException;
26
27
28
29
30
31
32
33
34
35
36 public final class PublisherDemo
37 {
38
39 private PublisherDemo()
40 {
41
42 }
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DSOLException,
57 OTSDrawingException, Sim0MQException, SerializationException, InterruptedException
58 {
59 ZContext zContext = new ZContext(5);
60
61 List<byte[]> receivedMessages = new ArrayList<>();
62 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
63 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
64 readMessageThread.start();
65
66 PublisherThread publisherThread = new PublisherThread(zContext);
67 publisherThread.start();
68
69 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
70 publisherControlSocket.connect("inproc://publisherControl");
71
72 int conversationId = 100;
73 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
74 sendCommand(publisherControlSocket,
75 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
76 for (int attempt = 0; attempt < 100; attempt++)
77 {
78 if (receivedMessages.size() > 0)
79 {
80 break;
81 }
82 Thread.sleep(100);
83 }
84 if (receivedMessages.size() == 0)
85 {
86 System.err.println("publisher does not respond");
87 }
88 else
89 {
90 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
91 if (!objects[5].equals(badCommand))
92 {
93 System.err.println("publisher return unexpected response");
94 }
95 System.out.println("Got expected response to unsupported command");
96 }
97
98
99
100 String xml = new String(Files
101 .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
102 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
103 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
104 sendCommand(publisherControlSocket,
105 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
106
107
108
109 sendCommand(publisherControlSocket,
110 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
111 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
112 conversationId++, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
113 sendCommand(publisherControlSocket,
114 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
115 int conversationIdForSubscribeToAdd = conversationId++;
116 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
117 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
118 sendCommand(publisherControlSocket,
119 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
120 int conversationIdForGTU2Move = conversationId++;
121 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
122 conversationIdForGTU2Move, "2"));
123 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
124 conversationId++, new Object[] { new Time(20, TimeUnit.BASE_SECOND) }));
125 sendCommand(publisherControlSocket,
126 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
127
128 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
129 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
130 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
131 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
132 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
133 conversationId++, new Object[] { new Time(30, TimeUnit.BASE_SECOND) }));
134 sendCommand(publisherControlSocket,
135 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
136 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
137 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
138 sendCommand(publisherControlSocket,
139 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
140 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
141 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
142 System.out.println("Master joining publisher thread (this should block until publisher has died)");
143 publisherThread.join();
144 System.out.println("Master has joined publisher thread");
145 System.out.println("Master interrupts read message thread");
146 readMessageThread.interrupt();
147 System.out.println("Master has interrupted read message thread; joining ...");
148 readMessageThread.join();
149 System.out.println("Master has joined read message thread");
150 System.out.println("Master exits");
151 }
152
153
154
155
156
157
158 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
159 {
160 try
161 {
162 Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
163 System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
164 }
165 catch (Sim0MQException | SerializationException e)
166 {
167 e.printStackTrace();
168 }
169 socket.send(message);
170 }
171
172
173
174
175 static class ReadMessageThread extends Thread
176 {
177
178 private final ZContext zContext;
179
180
181 private final List<byte[]> storage;
182
183
184
185
186
187
188 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
189 {
190 this.zContext = zContext;
191 this.storage = storage;
192 }
193
194 @Override
195 public void run()
196 {
197 System.out.println("Read message thread starting up");
198 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
199 socket.setReceiveTimeOut(100);
200 socket.bind("inproc://publisherOutput");
201 while (!Thread.interrupted())
202 {
203 byte[][] all = readMessages(socket);
204 for (byte[] one : all)
205 {
206 this.storage.add(one);
207 }
208 }
209 System.out.println("Read message thread exits due to interrupt");
210 }
211
212 }
213
214
215
216
217
218
219 public static byte[][] readMessages(final ZMQ.Socket socket)
220 {
221 List<byte[]> resultList = new ArrayList<>();
222 while (true)
223 {
224 byte[] message = socket.recv();
225 StringBuilder output = new StringBuilder();
226 if (null != message)
227 {
228 output.append("Master received " + message.length + " byte message: ");
229
230 try
231 {
232 Object[] fields = Sim0MQMessage.decodeToArray(message);
233 for (Object field : fields)
234 {
235 output.append("|" + field);
236 }
237 output.append("|");
238 }
239 catch (Sim0MQException | SerializationException e)
240 {
241 e.printStackTrace();
242 }
243 System.out.println(output);
244 resultList.add(message);
245 }
246 else
247 {
248 if (resultList.size() > 0)
249 {
250 System.out.println(
251 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
252 }
253 break;
254 }
255 }
256 return resultList.toArray(new byte[resultList.size()][]);
257 }
258
259
260
261
262 static class PublisherThread extends Thread
263 {
264
265 private final ZContext zContext;
266
267
268
269
270
271 PublisherThread(final ZContext zContext)
272 {
273 this.zContext = zContext;
274 }
275
276
277
278
279 PublisherThread()
280 {
281 this.zContext = new ZContext(5);
282 }
283
284 @Override
285 public void run()
286 {
287 try
288 {
289 new Sim0MQPublisher(this.zContext, "publisherControl", "publisherOutput");
290 }
291 catch (SimRuntimeException e)
292 {
293 e.printStackTrace();
294 }
295 System.out.println("Publisher thread exits");
296 }
297
298 }
299 }