1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.io.IOException;
4 import java.nio.file.Files;
5 import java.nio.file.Paths;
6 import java.util.ArrayList;
7 import java.util.Collections;
8 import java.util.List;
9
10 import javax.naming.NamingException;
11
12 import org.djunits.unit.DurationUnit;
13 import org.djunits.unit.TimeUnit;
14 import org.djunits.value.vdouble.scalar.Duration;
15 import org.djunits.value.vdouble.scalar.Time;
16 import org.djutils.serialization.SerializationException;
17 import org.opentrafficsim.draw.OtsDrawingException;
18 import org.sim0mq.Sim0MQException;
19 import org.sim0mq.message.Sim0MQMessage;
20 import org.zeromq.SocketType;
21 import org.zeromq.ZContext;
22 import org.zeromq.ZMQ;
23
24 import nl.tudelft.simulation.dsol.SimRuntimeException;
25 import nl.tudelft.simulation.language.DsolException;
26
27
28
29
30
31
32
33
34
35
36 public final class PublisherDemo
37 {
38
39 private PublisherDemo()
40 {
41
42 }
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DsolException,
57 OtsDrawingException, Sim0MQException, SerializationException, InterruptedException
58 {
59 ZContext zContext = new ZContext(5);
60
61 List<byte[]> receivedMessages = new ArrayList<>();
62 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
63 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
64 readMessageThread.start();
65
66 PublisherThread publisherThread = new PublisherThread(zContext);
67 publisherThread.start();
68
69 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
70 publisherControlSocket.connect("inproc://publisherControl");
71
72 int conversationId = 100;
73 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
74 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
75 for (int attempt = 0; attempt < 100; attempt++)
76 {
77 if (receivedMessages.size() > 0)
78 {
79 break;
80 }
81 Thread.sleep(100);
82 }
83 if (receivedMessages.size() == 0)
84 {
85 System.err.println("publisher does not respond");
86 }
87 else
88 {
89 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
90 if (!objects[5].equals(badCommand))
91 {
92 System.err.println("publisher return unexpected response");
93 }
94 System.out.println("Got expected response to unsupported command");
95 }
96
97
98
99 String xml = new String(Files
100 .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
101 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
102 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
103 sendCommand(publisherControlSocket,
104 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
105
106 sendCommand(publisherControlSocket,
107 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
108 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
109 conversationId++, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
110 sendCommand(publisherControlSocket,
111 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
112 int conversationIdForSubscribeToAdd = conversationId++;
113 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
114 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
115 sendCommand(publisherControlSocket,
116 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
117 int conversationIdForGTU2Move = conversationId++;
118 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
119 conversationIdForGTU2Move, "2"));
120 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
121 conversationId++, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
122 sendCommand(publisherControlSocket,
123 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
124
125 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
126 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
127 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
128 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
129 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
130 conversationId++, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
131 sendCommand(publisherControlSocket,
132 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
133 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
134 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
135 sendCommand(publisherControlSocket,
136 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
137 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
138 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
139 System.out.println("Master joining publisher thread (this should block until publisher has died)");
140 publisherThread.join();
141 System.out.println("Master has joined publisher thread");
142 System.out.println("Master interrupts read message thread");
143 readMessageThread.interrupt();
144 System.out.println("Master has interrupted read message thread; joining ...");
145 readMessageThread.join();
146 System.out.println("Master has joined read message thread");
147 System.out.println("Master exits");
148 }
149
150
151
152
153
154
155 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
156 {
157 try
158 {
159 Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
160 System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
161 }
162 catch (Sim0MQException | SerializationException e)
163 {
164 e.printStackTrace();
165 }
166 socket.send(message);
167 }
168
169
170
171
172 static class ReadMessageThread extends Thread
173 {
174
175 private final ZContext zContext;
176
177
178 private final List<byte[]> storage;
179
180
181
182
183
184
185 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
186 {
187 this.zContext = zContext;
188 this.storage = storage;
189 }
190
191 @Override
192 public void run()
193 {
194 System.out.println("Read message thread starting up");
195 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
196 socket.setReceiveTimeOut(100);
197 socket.bind("inproc://publisherOutput");
198 while (!Thread.interrupted())
199 {
200 byte[][] all = readMessages(socket);
201 for (byte[] one : all)
202 {
203 this.storage.add(one);
204 }
205 }
206 System.out.println("Read message thread exits due to interrupt");
207 }
208
209 }
210
211
212
213
214
215
216 public static byte[][] readMessages(final ZMQ.Socket socket)
217 {
218 List<byte[]> resultList = new ArrayList<>();
219 while (true)
220 {
221 byte[] message = socket.recv();
222 StringBuilder output = new StringBuilder();
223 if (null != message)
224 {
225 output.append("Master received " + message.length + " byte message: ");
226
227 try
228 {
229 Object[] fields = Sim0MQMessage.decodeToArray(message);
230 for (Object field : fields)
231 {
232 output.append("|" + field);
233 }
234 output.append("|");
235 }
236 catch (Sim0MQException | SerializationException e)
237 {
238 e.printStackTrace();
239 }
240 System.out.println(output);
241 resultList.add(message);
242 }
243 else
244 {
245 if (resultList.size() > 0)
246 {
247 System.out.println(
248 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
249 }
250 break;
251 }
252 }
253 return resultList.toArray(new byte[resultList.size()][]);
254 }
255
256
257
258
259 static class PublisherThread extends Thread
260 {
261
262 private final ZContext zContext;
263
264
265
266
267
268 PublisherThread(final ZContext zContext)
269 {
270 this.zContext = zContext;
271 }
272
273
274
275
276 PublisherThread()
277 {
278 this.zContext = new ZContext(5);
279 }
280
281 @Override
282 public void run()
283 {
284 try
285 {
286 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
287 }
288 catch (SimRuntimeException e)
289 {
290 e.printStackTrace();
291 }
292 System.out.println("Publisher thread exits");
293 }
294
295 }
296 }