1 package org.opentrafficsim.sim0mq.swing;
2
3 import java.io.IOException;
4 import java.nio.file.Files;
5 import java.nio.file.Paths;
6 import java.util.ArrayList;
7 import java.util.Collections;
8 import java.util.List;
9
10 import javax.naming.NamingException;
11
12 import org.djunits.unit.DurationUnit;
13 import org.djunits.value.vdouble.scalar.Duration;
14 import org.djutils.serialization.Endianness;
15 import org.djutils.serialization.SerializationException;
16 import org.djutils.serialization.util.SerialDataDumper;
17 import org.opentrafficsim.base.logger.Logger;
18 import org.sim0mq.Sim0MQException;
19 import org.sim0mq.message.Sim0MQMessage;
20 import org.zeromq.SocketType;
21 import org.zeromq.ZContext;
22 import org.zeromq.ZMQ;
23
24 import nl.tudelft.simulation.dsol.SimRuntimeException;
25 import nl.tudelft.simulation.language.DsolException;
26
27
28
29
30
31
32
33
34
35
36 public final class PublisherDemo
37 {
38
39 private PublisherDemo()
40 {
41
42 }
43
44
45
46
47
48
49
50
51
52
53
54
55 public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DsolException,
56 Sim0MQException, SerializationException, InterruptedException
57 {
58 ZContext zContext = new ZContext(5);
59
60 List<byte[]> receivedMessages = new ArrayList<>();
61 List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
62 ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
63 readMessageThread.start();
64
65 PublisherThread publisherThread = new PublisherThread(zContext);
66 publisherThread.start();
67
68 ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
69 publisherControlSocket.connect("inproc://publisherControl");
70
71 int conversationId = 100;
72 String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
73 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
74 for (int attempt = 0; attempt < 100; attempt++)
75 {
76 if (receivedMessages.size() > 0)
77 {
78 break;
79 }
80 Thread.sleep(100);
81 }
82 if (receivedMessages.size() == 0)
83 {
84 Logger.ots().error("publisher does not respond");
85 }
86 else
87 {
88 Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
89 if (!objects[5].equals(badCommand))
90 {
91 Logger.ots().error("publisher return unexpected response");
92 }
93 Logger.ots().info("Got expected response to unsupported command");
94 }
95
96
97
98 String xml = new String(Files
99 .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
100 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
101 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
102 sendCommand(publisherControlSocket,
103 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
104
105 sendCommand(publisherControlSocket,
106 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
107 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
108 conversationId++, new Object[] {Duration.ofSI(10.0)}));
109 sendCommand(publisherControlSocket,
110 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
111 int conversationIdForSubscribeToAdd = conversationId++;
112 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
113 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
114 sendCommand(publisherControlSocket,
115 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
116 int conversationIdForGTU2Move = conversationId++;
117 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
118 conversationIdForGTU2Move, "2"));
119 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
120 conversationId++, new Object[] {Duration.ofSI(20.0)}));
121 sendCommand(publisherControlSocket,
122 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
123
124 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
125 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
126 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
127 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2"));
128 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
129 conversationId++, new Object[] {Duration.ofSI(30.0)}));
130 sendCommand(publisherControlSocket,
131 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
132 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
133 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
134 sendCommand(publisherControlSocket,
135 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
136 sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
137 Logger.ots().info("Master has sent last command; Publisher should be busy for a while and then die");
138 Logger.ots().info("Master joining publisher thread (this should block until publisher has died)");
139 publisherThread.join();
140 Logger.ots().info("Master has joined publisher thread");
141 Logger.ots().info("Master interrupts read message thread");
142 readMessageThread.interrupt();
143 Logger.ots().info("Master has interrupted read message thread; joining ...");
144 readMessageThread.join();
145 Logger.ots().info("Master has joined read message thread");
146 Logger.ots().info("Master exits");
147 }
148
149
150
151
152
153
154 static void sendCommand(final ZMQ.Socket socket, final byte[] message)
155 {
156 try
157 {
158 Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
159 Logger.ots().info("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
160 }
161 catch (Sim0MQException | SerializationException e)
162 {
163 e.printStackTrace();
164 }
165 socket.send(message);
166 }
167
168
169
170
171 static class ReadMessageThread extends Thread
172 {
173
174 private final ZContext zContext;
175
176
177 private final List<byte[]> storage;
178
179
180
181
182
183
184 ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
185 {
186 this.zContext = zContext;
187 this.storage = storage;
188 }
189
190 @Override
191 public void run()
192 {
193 Logger.ots().info("Read message thread starting up");
194 ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
195 socket.setReceiveTimeOut(100);
196 socket.bind("inproc://publisherOutput");
197 while (!Thread.interrupted())
198 {
199 byte[][] all = readMessages(socket);
200 for (byte[] one : all)
201 {
202 this.storage.add(one);
203 }
204 }
205 Logger.ots().info("Read message thread exits due to interrupt");
206 }
207
208 }
209
210
211
212
213
214
215 public static byte[][] readMessages(final ZMQ.Socket socket)
216 {
217 List<byte[]> resultList = new ArrayList<>();
218 while (true)
219 {
220 byte[] message = socket.recv();
221 StringBuilder output = new StringBuilder();
222 if (null != message)
223 {
224 output.append("Master received " + message.length + " byte message: ");
225 Logger.ots().trace(SerialDataDumper.serialDataDumper(Endianness.BIG_ENDIAN, message));
226 try
227 {
228 Object[] fields = Sim0MQMessage.decodeToArray(message);
229 for (Object field : fields)
230 {
231 output.append("|" + field);
232 }
233 output.append("|");
234 }
235 catch (Sim0MQException | SerializationException e)
236 {
237 e.printStackTrace();
238 }
239 Logger.ots().info(output);
240 resultList.add(message);
241 }
242 else
243 {
244 if (resultList.size() > 0)
245 {
246 Logger.ots()
247 .info("Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
248 }
249 break;
250 }
251 }
252 return resultList.toArray(new byte[resultList.size()][]);
253 }
254
255
256
257
258 static class PublisherThread extends Thread
259 {
260
261 private final ZContext zContext;
262
263
264
265
266
267 PublisherThread(final ZContext zContext)
268 {
269 this.zContext = zContext;
270 }
271
272
273
274
275 PublisherThread()
276 {
277 this.zContext = new ZContext(5);
278 }
279
280 @Override
281 public void run()
282 {
283 try
284 {
285 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
286 }
287 catch (SimRuntimeException e)
288 {
289 e.printStackTrace();
290 }
291 Logger.ots().info("Publisher thread exits");
292 }
293
294 }
295 }