1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.io.IOException;
4 import java.nio.file.Files;
5 import java.nio.file.Paths;
6 import java.util.ArrayList;
7 import java.util.Collections;
8 import java.util.List;
9
10 import javax.naming.NamingException;
11
12 import org.djunits.unit.DurationUnit;
13 import org.djunits.unit.TimeUnit;
14 import org.djunits.value.vdouble.scalar.Duration;
15 import org.djunits.value.vdouble.scalar.Time;
16 import org.djutils.serialization.SerializationException;
17 import org.sim0mq.Sim0MQException;
18 import org.sim0mq.message.Sim0MQMessage;
19 import org.zeromq.SocketType;
20 import org.zeromq.ZContext;
21 import org.zeromq.ZMQ;
22
23 import nl.tudelft.simulation.dsol.SimRuntimeException;
24 import nl.tudelft.simulation.language.DsolException;
25
26
27
28
29
30
31
32
33
34
35 public final class PublisherDemo
36 {
37
38 private PublisherDemo()
39 {
40
41 }
42
43
44
45
46
47
48
49
50
51
52
53
54 public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DsolException,
55 Sim0MQException, SerializationException, InterruptedException
56 {
57 ZContext zContext = new ZContext(5);
58
59 List<byte[]> receivedMessages = new ArrayList<>();
60 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
61 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
62 readMessageThread.start();
63
64 PublisherThread publisherThread = new PublisherThread(zContext);
65 publisherThread.start();
66
67 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
68 publisherControlSocket.connect("inproc://publisherControl");
69
70 int conversationId = 100;
71 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
72 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
73 for (int attempt = 0; attempt < 100; attempt++)
74 {
75 if (receivedMessages.size() > 0)
76 {
77 break;
78 }
79 Thread.sleep(100);
80 }
81 if (receivedMessages.size() == 0)
82 {
83 System.err.println("publisher does not respond");
84 }
85 else
86 {
87 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
88 if (!objects[5].equals(badCommand))
89 {
90 System.err.println("publisher return unexpected response");
91 }
92 System.out.println("Got expected response to unsupported command");
93 }
94
95
96
97 String xml = new String(Files
98 .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
99 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
100 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
101 sendCommand(publisherControlSocket,
102 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
103
104 sendCommand(publisherControlSocket,
105 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
106 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
107 conversationId++, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
108 sendCommand(publisherControlSocket,
109 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
110 int conversationIdForSubscribeToAdd = conversationId++;
111 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
112 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
113 sendCommand(publisherControlSocket,
114 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
115 int conversationIdForGTU2Move = conversationId++;
116 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
117 conversationIdForGTU2Move, "2"));
118 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
119 conversationId++, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
120 sendCommand(publisherControlSocket,
121 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
122
123 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
124 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
125 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
126 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
127 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
128 conversationId++, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
129 sendCommand(publisherControlSocket,
130 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
131 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
132 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
133 sendCommand(publisherControlSocket,
134 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
135 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
136 System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
137 System.out.println("Master joining publisher thread (this should block until publisher has died)");
138 publisherThread.join();
139 System.out.println("Master has joined publisher thread");
140 System.out.println("Master interrupts read message thread");
141 readMessageThread.interrupt();
142 System.out.println("Master has interrupted read message thread; joining ...");
143 readMessageThread.join();
144 System.out.println("Master has joined read message thread");
145 System.out.println("Master exits");
146 }
147
148
149
150
151
152
153 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
154 {
155 try
156 {
157 Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
158 System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
159 }
160 catch (Sim0MQException | SerializationException e)
161 {
162 e.printStackTrace();
163 }
164 socket.send(message);
165 }
166
167
168
169
170 static class ReadMessageThread extends Thread
171 {
172
173 private final ZContext zContext;
174
175
176 private final List<byte[]> storage;
177
178
179
180
181
182
183 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
184 {
185 this.zContext = zContext;
186 this.storage = storage;
187 }
188
189 @Override
190 public void run()
191 {
192 System.out.println("Read message thread starting up");
193 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
194 socket.setReceiveTimeOut(100);
195 socket.bind("inproc://publisherOutput");
196 while (!Thread.interrupted())
197 {
198 byte[][] all = readMessages(socket);
199 for (byte[] one : all)
200 {
201 this.storage.add(one);
202 }
203 }
204 System.out.println("Read message thread exits due to interrupt");
205 }
206
207 }
208
209
210
211
212
213
214 public static byte[][] readMessages(final ZMQ.Socket socket)
215 {
216 List<byte[]> resultList = new ArrayList<>();
217 while (true)
218 {
219 byte[] message = socket.recv();
220 StringBuilder output = new StringBuilder();
221 if (null != message)
222 {
223 output.append("Master received " + message.length + " byte message: ");
224
225 try
226 {
227 Object[] fields = Sim0MQMessage.decodeToArray(message);
228 for (Object field : fields)
229 {
230 output.append("|" + field);
231 }
232 output.append("|");
233 }
234 catch (Sim0MQException | SerializationException e)
235 {
236 e.printStackTrace();
237 }
238 System.out.println(output);
239 resultList.add(message);
240 }
241 else
242 {
243 if (resultList.size() > 0)
244 {
245 System.out.println(
246 "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
247 }
248 break;
249 }
250 }
251 return resultList.toArray(new byte[resultList.size()][]);
252 }
253
254
255
256
257 static class PublisherThread extends Thread
258 {
259
260 private final ZContext zContext;
261
262
263
264
265
266 PublisherThread(final ZContext zContext)
267 {
268 this.zContext = zContext;
269 }
270
271
272
273
274 PublisherThread()
275 {
276 this.zContext = new ZContext(5);
277 }
278
279 @Override
280 public void run()
281 {
282 try
283 {
284 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
285 }
286 catch (SimRuntimeException e)
287 {
288 e.printStackTrace();
289 }
290 System.out.println("Publisher thread exits");
291 }
292
293 }
294 }