1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.io.IOException;
4 import java.nio.file.Files;
5 import java.nio.file.Paths;
6 import java.util.ArrayList;
7 import java.util.Collections;
8 import java.util.List;
9
10 import javax.naming.NamingException;
11
12 import org.djunits.unit.DurationUnit;
13 import org.djunits.unit.TimeUnit;
14 import org.djunits.value.vdouble.scalar.Duration;
15 import org.djunits.value.vdouble.scalar.Time;
16 import org.djutils.serialization.SerializationException;
17 import org.opentrafficsim.draw.core.OTSDrawingException;
18 import org.sim0mq.Sim0MQException;
19 import org.sim0mq.message.Sim0MQMessage;
20 import org.zeromq.SocketType;
21 import org.zeromq.ZContext;
22 import org.zeromq.ZMQ;
23
24 import nl.tudelft.simulation.dsol.SimRuntimeException;
25 import nl.tudelft.simulation.language.DSOLException;
26
27
28
29
30
31
32
33
34
35
36 public final class PublisherDemo
37 {
38
39 private PublisherDemo()
40 {
41
42 }
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DSOLException,
57 OTSDrawingException, Sim0MQException, SerializationException, InterruptedException
58 {
59 ZContext zContext = new ZContext(5);
60
61 List<byte[]> receivedMessages = new ArrayList<>();
62 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
63 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
64 readMessageThread.start();
65
66 PublisherThread publisherThread = new PublisherThread(zContext);
67 publisherThread.start();
68
69 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
70 publisherControlSocket.connect("inproc://publisherControl");
71
72 int conversationId = 100;
73 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
74 sendCommand(publisherControlSocket,
75 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
76 for (int attempt = 0; attempt < 100; attempt++)
77 {
78 if (receivedMessages.size() > 0)
79 {
80 break;
81 }
82 Thread.sleep(100);
83 }
84 if (receivedMessages.size() == 0)
85 {
86 System.err.println("publisher does not respond");
87 }
88 else
89 {
90 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
91 if (!objects[5].equals(badCommand))
92 {
93 System.err.println("publisher return unexpected response");
94 }
95 System.out.println("Got expected response to unsupported command");
96 }
97
98 String xml = new String(Files
99 .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
100 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
101 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
102 sendCommand(publisherControlSocket,
103 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
104
105
106
107 sendCommand(publisherControlSocket,
108 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
109 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
110 conversationId++, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
111 sendCommand(publisherControlSocket,
112 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
113 int conversationIdForSubscribeToAdd = conversationId++;
114 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
115 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
116 sendCommand(publisherControlSocket,
117 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
118 int conversationIdForGTU2Move = conversationId++;
119 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
120 conversationIdForGTU2Move, "2"));
121 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
122 conversationId++, new Object[] { new Time(20, TimeUnit.BASE_SECOND) }));
123 sendCommand(publisherControlSocket,
124 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
125
126 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
127 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
128 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
129 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
130 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
131 conversationId++, new Object[] { new Time(30, TimeUnit.BASE_SECOND) }));
132 sendCommand(publisherControlSocket,
133 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
134 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
135 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
136 sendCommand(publisherControlSocket,
137 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
138 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
139 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
140 System.out.println("Master joining publisher thread (this should block until publisher has died)");
141 publisherThread.join();
142 System.out.println("Master has joined publisher thread");
143 System.out.println("Master interrupts read message thread");
144 readMessageThread.interrupt();
145 System.out.println("Master has interrupted read message thread; joining ...");
146 readMessageThread.join();
147 System.out.println("Master has joined read message thread");
148 System.out.println("Master exits");
149 }
150
151
152
153
154
155
156 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
157 {
158 try
159 {
160 Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
161 System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
162 }
163 catch (Sim0MQException | SerializationException e)
164 {
165 e.printStackTrace();
166 }
167 socket.send(message);
168 }
169
170
171
172
173 static class ReadMessageThread extends Thread
174 {
175
176 private final ZContext zContext;
177
178
179 private final List<byte[]> storage;
180
181
182
183
184
185
186 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
187 {
188 this.zContext = zContext;
189 this.storage = storage;
190 }
191
192 @Override
193 public void run()
194 {
195 System.out.println("Read message thread starting up");
196 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
197 socket.setReceiveTimeOut(100);
198 socket.bind("inproc://publisherOutput");
199 while (!Thread.interrupted())
200 {
201 byte[][] all = readMessages(socket);
202 for (byte[] one : all)
203 {
204 this.storage.add(one);
205 }
206 }
207 System.out.println("Read message thread exits due to interrupt");
208 }
209
210 }
211
212
213
214
215
216
217 public static byte[][] readMessages(final ZMQ.Socket socket)
218 {
219 List<byte[]> resultList = new ArrayList<>();
220 while (true)
221 {
222 byte[] message = socket.recv();
223 StringBuilder output = new StringBuilder();
224 if (null != message)
225 {
226 output.append("Master received " + message.length + " byte message: ");
227
228 try
229 {
230 Object[] fields = Sim0MQMessage.decodeToArray(message);
231 for (Object field : fields)
232 {
233 output.append("|" + field);
234 }
235 output.append("|");
236 }
237 catch (Sim0MQException | SerializationException e)
238 {
239 e.printStackTrace();
240 }
241 System.out.println(output);
242 resultList.add(message);
243 }
244 else
245 {
246 if (resultList.size() > 0)
247 {
248 System.out.println(
249 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
250 }
251 break;
252 }
253 }
254 return resultList.toArray(new byte[resultList.size()][]);
255 }
256
257
258
259
260 static class PublisherThread extends Thread
261 {
262
263 private final ZContext zContext;
264
265
266
267
268
269 PublisherThread(final ZContext zContext)
270 {
271 this.zContext = zContext;
272 }
273
274
275
276
277 PublisherThread()
278 {
279 this.zContext = new ZContext(5);
280 }
281
282 @Override
283 public void run()
284 {
285 try
286 {
287 new Sim0MQPublisher(zContext, "publisherControl", "publisherOutput");
288 }
289 catch (SimRuntimeException e)
290 {
291 e.printStackTrace();
292 }
293 System.out.println("Publisher thread exits");
294 }
295
296 }
297 }