View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.io.IOException;
4   import java.nio.file.Files;
5   import java.nio.file.Paths;
6   import java.util.ArrayList;
7   import java.util.Collections;
8   import java.util.List;
9   
10  import javax.naming.NamingException;
11  
12  import org.djunits.unit.DurationUnit;
13  import org.djunits.unit.TimeUnit;
14  import org.djunits.value.vdouble.scalar.Duration;
15  import org.djunits.value.vdouble.scalar.Time;
16  import org.djutils.serialization.SerializationException;
17  import org.opentrafficsim.draw.OtsDrawingException;
18  import org.sim0mq.Sim0MQException;
19  import org.sim0mq.message.Sim0MQMessage;
20  import org.zeromq.SocketType;
21  import org.zeromq.ZContext;
22  import org.zeromq.ZMQ;
23  
24  import nl.tudelft.simulation.dsol.SimRuntimeException;
25  import nl.tudelft.simulation.language.DsolException;
26  
27  /**
28   * Experiment with the Sim0MQPublisher.
29   * <p>
30   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
31   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
32   * </p>
33   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
34   * @author <a href="https://tudelft.nl/staff/p.knoppers-1">Peter Knoppers</a>
35   */
36  public final class PublisherDemo
37  {
38      /** Do not instantiate. */
39      private PublisherDemo()
40      {
41          // Do not instantiate
42      }
43  
44      /**
45       * Test code.
46       * @param args String[]; the command line arguments (not used)
47       * @throws IOException ...
48       * @throws NamingException ...
49       * @throws SimRuntimeException ...
50       * @throws DsolException ...
51       * @throws OtsDrawingException ...
52       * @throws SerializationException ...
53       * @throws Sim0MQException ...
54       * @throws InterruptedException ...
55       */
56      public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DsolException,
57              OtsDrawingException, Sim0MQException, SerializationException, InterruptedException
58      {
59          ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
60  
61          List<byte[]> receivedMessages = new ArrayList<>();
62          List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
63          ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
64          readMessageThread.start();
65  
66          PublisherThread publisherThread = new PublisherThread(zContext);
67          publisherThread.start();
68  
69          ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
70          publisherControlSocket.connect("inproc://publisherControl");
71  
72          int conversationId = 100; // Number the commands starting with something that is very different from 0
73          String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
74          sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
75          for (int attempt = 0; attempt < 100; attempt++)
76          {
77              if (receivedMessages.size() > 0)
78              {
79                  break;
80              }
81              Thread.sleep(100);
82          }
83          if (receivedMessages.size() == 0)
84          {
85              System.err.println("publisher does not respond");
86          }
87          else
88          {
89              Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
90              if (!objects[5].equals(badCommand))
91              {
92                  System.err.println("publisher return unexpected response");
93              }
94              System.out.println("Got expected response to unsupported command");
95          }
96  
97          // FIXME: This is of course not the intention...
98          // FIXME: make the network available as a resource...
99          String xml = new String(Files
100                 .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
101         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
102                 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
103         sendCommand(publisherControlSocket,
104                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
105 
106         sendCommand(publisherControlSocket,
107                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
108         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
109                 conversationId++, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
110         sendCommand(publisherControlSocket,
111                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
112         int conversationIdForSubscribeToAdd = conversationId++; // We need that to unsubscribe later
113         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
114                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
115         sendCommand(publisherControlSocket,
116                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
117         int conversationIdForGTU2Move = conversationId++;
118         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
119                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
120         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
121                 conversationId++, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
122         sendCommand(publisherControlSocket,
123                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
124         // unsubscribe from GTU ADD events using saved conversationId
125         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
126                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
127         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
128                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
129         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
130                 conversationId++, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
131         sendCommand(publisherControlSocket,
132                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
133         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
134                 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
135         sendCommand(publisherControlSocket,
136                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
137         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
138         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
139         System.out.println("Master joining publisher thread (this should block until publisher has died)");
140         publisherThread.join();
141         System.out.println("Master has joined publisher thread");
142         System.out.println("Master interrupts read message thread");
143         readMessageThread.interrupt();
144         System.out.println("Master has interrupted read message thread; joining ...");
145         readMessageThread.join();
146         System.out.println("Master has joined read message thread");
147         System.out.println("Master exits");
148     }
149 
150     /**
151      * Wrapper for ZMQ.Socket.send that may output some debugging information.
152      * @param socket ZMQ.Socket; the socket to send onto
153      * @param message byte[]; the message to transmit
154      */
155     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
156     {
157         try
158         {
159             Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
160             System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
161         }
162         catch (Sim0MQException | SerializationException e)
163         {
164             e.printStackTrace();
165         }
166         socket.send(message);
167     }
168 
169     /**
170      * Repeatedly try to read all available messages.
171      */
172     static class ReadMessageThread extends Thread
173     {
174         /** The ZContext needed to create the socket. */
175         private final ZContext zContext;
176 
177         /** Storage for the received messages. */
178         private final List<byte[]> storage;
179 
180         /**
181          * Repeatedly read all available messages.
182          * @param zContext ZContext; the ZContext needed to create the read socket
183          * @param storage List&lt;String&gt;; storage for the received messages
184          */
185         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
186         {
187             this.zContext = zContext;
188             this.storage = storage;
189         }
190 
191         @Override
192         public void run()
193         {
194             System.out.println("Read message thread starting up");
195             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
196             socket.setReceiveTimeOut(100);
197             socket.bind("inproc://publisherOutput");
198             while (!Thread.interrupted())
199             {
200                 byte[][] all = readMessages(socket);
201                 for (byte[] one : all)
202                 {
203                     this.storage.add(one);
204                 }
205             }
206             System.out.println("Read message thread exits due to interrupt");
207         }
208 
209     }
210 
211     /**
212      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
213      * @param socket ZMQ.Socket; the socket
214      * @return byte[][]; the read messages
215      */
216     public static byte[][] readMessages(final ZMQ.Socket socket)
217     {
218         List<byte[]> resultList = new ArrayList<>();
219         while (true)
220         {
221             byte[] message = socket.recv();
222             StringBuilder output = new StringBuilder();
223             if (null != message)
224             {
225                 output.append("Master received " + message.length + " byte message: ");
226                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
227                 try
228                 {
229                     Object[] fields = Sim0MQMessage.decodeToArray(message);
230                     for (Object field : fields)
231                     {
232                         output.append("|" + field);
233                     }
234                     output.append("|");
235                 }
236                 catch (Sim0MQException | SerializationException e)
237                 {
238                     e.printStackTrace();
239                 }
240                 System.out.println(output);
241                 resultList.add(message);
242             }
243             else
244             {
245                 if (resultList.size() > 0)
246                 {
247                     System.out.println(
248                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
249                 }
250                 break;
251             }
252         }
253         return resultList.toArray(new byte[resultList.size()][]);
254     }
255 
256     /**
257      * Thread that runs a PublisherExperiment.
258      */
259     static class PublisherThread extends Thread
260     {
261         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
262         private final ZContext zContext;
263 
264         /**
265          * Construct a new PublisherThread.
266          * @param zContext ZContext; needed to construct the PublisherExperimentUsingSockets
267          */
268         PublisherThread(final ZContext zContext)
269         {
270             this.zContext = zContext;
271         }
272 
273         /**
274          * Construct a new PublisherThread.
275          */
276         PublisherThread()
277         {
278             this.zContext = new ZContext(5);
279         }
280 
281         @Override
282         public void run()
283         {
284             try
285             {
286                 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
287             }
288             catch (SimRuntimeException e)
289             {
290                 e.printStackTrace();
291             }
292             System.out.println("Publisher thread exits");
293         }
294 
295     }
296 }