View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.io.IOException;
4   import java.nio.file.Files;
5   import java.nio.file.Paths;
6   import java.util.ArrayList;
7   import java.util.Collections;
8   import java.util.List;
9   
10  import javax.naming.NamingException;
11  
12  import org.djunits.unit.DurationUnit;
13  import org.djunits.value.vdouble.scalar.Duration;
14  import org.djutils.serialization.Endianness;
15  import org.djutils.serialization.SerializationException;
16  import org.djutils.serialization.util.SerialDataDumper;
17  import org.opentrafficsim.base.logger.Logger;
18  import org.sim0mq.Sim0MQException;
19  import org.sim0mq.message.Sim0MQMessage;
20  import org.zeromq.SocketType;
21  import org.zeromq.ZContext;
22  import org.zeromq.ZMQ;
23  
24  import nl.tudelft.simulation.dsol.SimRuntimeException;
25  import nl.tudelft.simulation.language.DsolException;
26  
27  /**
28   * Experiment with the Sim0MQPublisher.
29   * <p>
30   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
31   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
32   * </p>
33   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
34   * @author <a href="https://github.com/peter-knoppers">Peter Knoppers</a>
35   */
36  public final class PublisherDemo
37  {
38      /** Do not instantiate. */
39      private PublisherDemo()
40      {
41          // Do not instantiate
42      }
43  
44      /**
45       * Test code.
46       * @param args the command line arguments (not used)
47       * @throws IOException ...
48       * @throws NamingException ...
49       * @throws SimRuntimeException ...
50       * @throws DsolException ...
51       * @throws SerializationException ...
52       * @throws Sim0MQException ...
53       * @throws InterruptedException ...
54       */
55      public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DsolException,
56              Sim0MQException, SerializationException, InterruptedException
57      {
58          ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
59  
60          List<byte[]> receivedMessages = new ArrayList<>();
61          List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
62          ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
63          readMessageThread.start();
64  
65          PublisherThread publisherThread = new PublisherThread(zContext);
66          publisherThread.start();
67  
68          ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
69          publisherControlSocket.connect("inproc://publisherControl");
70  
71          int conversationId = 100; // Number the commands starting with something that is very different from 0
72          String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
73          sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
74          for (int attempt = 0; attempt < 100; attempt++)
75          {
76              if (receivedMessages.size() > 0)
77              {
78                  break;
79              }
80              Thread.sleep(100);
81          }
82          if (receivedMessages.size() == 0)
83          {
84              Logger.ots().error("publisher does not respond");
85          }
86          else
87          {
88              Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
89              if (!objects[5].equals(badCommand))
90              {
91                  Logger.ots().error("publisher return unexpected response");
92              }
93              Logger.ots().info("Got expected response to unsupported command");
94          }
95  
96          // FIXME: This is of course not the intention...
97          // FIXME: make the network available as a resource...
98          String xml = new String(Files
99                  .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
100         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
101                 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
102         sendCommand(publisherControlSocket,
103                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
104 
105         sendCommand(publisherControlSocket,
106                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
107         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
108                 conversationId++, new Object[] {Duration.ofSI(10.0)}));
109         sendCommand(publisherControlSocket,
110                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
111         int conversationIdForSubscribeToAdd = conversationId++; // We need that to unsubscribe later
112         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
113                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
114         sendCommand(publisherControlSocket,
115                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
116         int conversationIdForGTU2Move = conversationId++;
117         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
118                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
119         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
120                 conversationId++, new Object[] {Duration.ofSI(20.0)}));
121         sendCommand(publisherControlSocket,
122                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
123         // unsubscribe from GTU ADD events using saved conversationId
124         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
125                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
126         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
127                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
128         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
129                 conversationId++, new Object[] {Duration.ofSI(30.0)}));
130         sendCommand(publisherControlSocket,
131                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
132         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
133                 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
134         sendCommand(publisherControlSocket,
135                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
136         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
137         Logger.ots().info("Master has sent last command; Publisher should be busy for a while and then die");
138         Logger.ots().info("Master joining publisher thread (this should block until publisher has died)");
139         publisherThread.join();
140         Logger.ots().info("Master has joined publisher thread");
141         Logger.ots().info("Master interrupts read message thread");
142         readMessageThread.interrupt();
143         Logger.ots().info("Master has interrupted read message thread; joining ...");
144         readMessageThread.join();
145         Logger.ots().info("Master has joined read message thread");
146         Logger.ots().info("Master exits");
147     }
148 
149     /**
150      * Wrapper for ZMQ.Socket.send that may output some debugging information.
151      * @param socket ZMQ.Socket; the socket to send onto
152      * @param message the message to transmit
153      */
154     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
155     {
156         try
157         {
158             Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
159             Logger.ots().info("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
160         }
161         catch (Sim0MQException | SerializationException e)
162         {
163             e.printStackTrace();
164         }
165         socket.send(message);
166     }
167 
168     /**
169      * Repeatedly try to read all available messages.
170      */
171     static class ReadMessageThread extends Thread
172     {
173         /** The ZContext needed to create the socket. */
174         private final ZContext zContext;
175 
176         /** Storage for the received messages. */
177         private final List<byte[]> storage;
178 
179         /**
180          * Repeatedly read all available messages.
181          * @param zContext the ZContext needed to create the read socket
182          * @param storage storage for the received messages
183          */
184         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
185         {
186             this.zContext = zContext;
187             this.storage = storage;
188         }
189 
190         @Override
191         public void run()
192         {
193             Logger.ots().info("Read message thread starting up");
194             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
195             socket.setReceiveTimeOut(100);
196             socket.bind("inproc://publisherOutput");
197             while (!Thread.interrupted())
198             {
199                 byte[][] all = readMessages(socket);
200                 for (byte[] one : all)
201                 {
202                     this.storage.add(one);
203                 }
204             }
205             Logger.ots().info("Read message thread exits due to interrupt");
206         }
207 
208     }
209 
210     /**
211      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
212      * @param socket ZMQ.Socket; the socket
213      * @return the read messages
214      */
215     public static byte[][] readMessages(final ZMQ.Socket socket)
216     {
217         List<byte[]> resultList = new ArrayList<>();
218         while (true)
219         {
220             byte[] message = socket.recv();
221             StringBuilder output = new StringBuilder();
222             if (null != message)
223             {
224                 output.append("Master received " + message.length + " byte message: ");
225                 Logger.ots().trace(SerialDataDumper.serialDataDumper(Endianness.BIG_ENDIAN, message));
226                 try
227                 {
228                     Object[] fields = Sim0MQMessage.decodeToArray(message);
229                     for (Object field : fields)
230                     {
231                         output.append("|" + field);
232                     }
233                     output.append("|");
234                 }
235                 catch (Sim0MQException | SerializationException e)
236                 {
237                     e.printStackTrace();
238                 }
239                 Logger.ots().info(output);
240                 resultList.add(message);
241             }
242             else
243             {
244                 if (resultList.size() > 0)
245                 {
246                     Logger.ots()
247                             .info("Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
248                 }
249                 break;
250             }
251         }
252         return resultList.toArray(new byte[resultList.size()][]);
253     }
254 
255     /**
256      * Thread that runs a PublisherExperiment.
257      */
258     static class PublisherThread extends Thread
259     {
260         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
261         private final ZContext zContext;
262 
263         /**
264          * Construct a new PublisherThread.
265          * @param zContext needed to construct the PublisherExperimentUsingSockets
266          */
267         PublisherThread(final ZContext zContext)
268         {
269             this.zContext = zContext;
270         }
271 
272         /**
273          * Construct a new PublisherThread.
274          */
275         PublisherThread()
276         {
277             this.zContext = new ZContext(5);
278         }
279 
280         @Override
281         public void run()
282         {
283             try
284             {
285                 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
286             }
287             catch (SimRuntimeException e)
288             {
289                 e.printStackTrace();
290             }
291             Logger.ots().info("Publisher thread exits");
292         }
293 
294     }
295 }