View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.io.IOException;
4   import java.nio.file.Files;
5   import java.nio.file.Paths;
6   import java.util.ArrayList;
7   import java.util.Collections;
8   import java.util.List;
9   
10  import javax.naming.NamingException;
11  
12  import org.djunits.unit.DurationUnit;
13  import org.djunits.unit.TimeUnit;
14  import org.djunits.value.vdouble.scalar.Duration;
15  import org.djunits.value.vdouble.scalar.Time;
16  import org.djutils.serialization.SerializationException;
17  import org.opentrafficsim.draw.core.OTSDrawingException;
18  import org.sim0mq.Sim0MQException;
19  import org.sim0mq.message.Sim0MQMessage;
20  import org.zeromq.SocketType;
21  import org.zeromq.ZContext;
22  import org.zeromq.ZMQ;
23  
24  import nl.tudelft.simulation.dsol.SimRuntimeException;
25  import nl.tudelft.simulation.language.DSOLException;
26  
27  /**
28   * Experiment with the Sim0MQPublisher.
29   * <p>
30   * Copyright (c) 2020-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
31   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
32   * <p>
33   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
34   * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
35   */
36  public final class PublisherDemo
37  {
38      /** Do not instantiate. */
39      private PublisherDemo()
40      {
41          // Do not instantiate
42      }
43  
44      /**
45       * Test code.
46       * @param args String[]; the command line arguments (not used)
47       * @throws IOException ...
48       * @throws NamingException ...
49       * @throws SimRuntimeException ...
50       * @throws DSOLException ...
51       * @throws OTSDrawingException ...
52       * @throws SerializationException ...
53       * @throws Sim0MQException ...
54       * @throws InterruptedException ...
55       */
56      public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DSOLException,
57              OTSDrawingException, Sim0MQException, SerializationException, InterruptedException
58      {
59          ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
60  
61          List<byte[]> receivedMessages = new ArrayList<>();
62          List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
63          ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
64          readMessageThread.start();
65  
66          PublisherThread publisherThread = new PublisherThread(zContext);
67          publisherThread.start();
68  
69          ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
70          publisherControlSocket.connect("inproc://publisherControl");
71  
72          int conversationId = 100; // Number the commands starting with something that is very different from 0
73          String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
74          sendCommand(publisherControlSocket,
75                  Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
76          for (int attempt = 0; attempt < 100; attempt++)
77          {
78              if (receivedMessages.size() > 0)
79              {
80                  break;
81              }
82              Thread.sleep(100);
83          }
84          if (receivedMessages.size() == 0)
85          {
86              System.err.println("publisher does not respond");
87          }
88          else
89          {
90              Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
91              if (!objects[5].equals(badCommand))
92              {
93                  System.err.println("publisher return unexpected response");
94              }
95              System.out.println("Got expected response to unsupported command");
96          }
97          
98          // FIXME: This is of course not the intention... 
99          // FIXME: make the network available as a resource...
100         String xml = new String(Files
101                 .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
102         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
103                 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
104         sendCommand(publisherControlSocket,
105                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
106         
107         
108         
109         sendCommand(publisherControlSocket,
110                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
111         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
112                 conversationId++, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
113         sendCommand(publisherControlSocket,
114                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
115         int conversationIdForSubscribeToAdd = conversationId++; // We need that to unsubscribe later
116         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
117                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
118         sendCommand(publisherControlSocket,
119                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
120         int conversationIdForGTU2Move = conversationId++;
121         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
122                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
123         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
124                 conversationId++, new Object[] { new Time(20, TimeUnit.BASE_SECOND) }));
125         sendCommand(publisherControlSocket,
126                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
127         // unsubscribe from GTU ADD events using saved conversationId
128         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
129                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
130         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
131                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
132         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
133                 conversationId++, new Object[] { new Time(30, TimeUnit.BASE_SECOND) }));
134         sendCommand(publisherControlSocket,
135                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
136         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
137                 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
138         sendCommand(publisherControlSocket,
139                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
140         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
141         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
142         System.out.println("Master joining publisher thread (this should block until publisher has died)");
143         publisherThread.join();
144         System.out.println("Master has joined publisher thread");
145         System.out.println("Master interrupts read message thread");
146         readMessageThread.interrupt();
147         System.out.println("Master has interrupted read message thread; joining ...");
148         readMessageThread.join();
149         System.out.println("Master has joined read message thread");
150         System.out.println("Master exits");
151     }
152 
153     /**
154      * Wrapper for ZMQ.Socket.send that may output some debugging information.
155      * @param socket ZMQ.Socket; the socket to send onto
156      * @param message byte[]; the message to transmit
157      */
158     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
159     {
160         try
161         {
162             Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
163             System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
164         }
165         catch (Sim0MQException | SerializationException e)
166         {
167             e.printStackTrace();
168         }
169         socket.send(message);
170     }
171 
172     /**
173      * Repeatedly try to read all available messages.
174      */
175     static class ReadMessageThread extends Thread
176     {
177         /** The ZContext needed to create the socket. */
178         private final ZContext zContext;
179         
180         /** Storage for the received messages. */
181         private final List<byte[]> storage;
182 
183         /**
184          * Repeatedly read all available messages.
185          * @param zContext ZContext; the ZContext needed to create the read socket
186          * @param storage List&lt;String&gt;; storage for the received messages
187          */
188         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
189         {
190             this.zContext = zContext;
191             this.storage = storage;
192         }
193 
194         @Override
195         public void run()
196         {
197             System.out.println("Read message thread starting up");
198             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
199             socket.setReceiveTimeOut(100);
200             socket.bind("inproc://publisherOutput");
201             while (!Thread.interrupted())
202             {
203                 byte[][] all = readMessages(socket);
204                 for (byte[] one : all)
205                 {
206                     this.storage.add(one);
207                 }
208             }
209             System.out.println("Read message thread exits due to interrupt");
210         }
211 
212     }
213 
214     /**
215      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
216      * @param socket ZMQ.Socket; the socket
217      * @return byte[][]; the read messages
218      */
219     public static byte[][] readMessages(final ZMQ.Socket socket)
220     {
221         List<byte[]> resultList = new ArrayList<>();
222         while (true)
223         {
224             byte[] message = socket.recv();
225             StringBuilder output = new StringBuilder();
226             if (null != message)
227             {
228                 output.append("Master received " + message.length + " byte message: ");
229                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
230                 try
231                 {
232                     Object[] fields = Sim0MQMessage.decodeToArray(message);
233                     for (Object field : fields)
234                     {
235                         output.append("|" + field);
236                     }
237                     output.append("|");
238                 }
239                 catch (Sim0MQException | SerializationException e)
240                 {
241                     e.printStackTrace();
242                 }
243                 System.out.println(output);
244                 resultList.add(message);
245             }
246             else
247             {
248                 if (resultList.size() > 0)
249                 {
250                     System.out.println(
251                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
252                 }
253                 break;
254             }
255         }
256         return resultList.toArray(new byte[resultList.size()][]);
257     }
258 
259     /**
260      * Thread that runs a PublisherExperiment.
261      */
262     static class PublisherThread extends Thread
263     {
264         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
265         private final ZContext zContext;
266 
267         /**
268          * Construct a new PublisherThread.
269          * @param zContext ZContext; needed to construct the PublisherExperimentUsingSockets
270          */
271         PublisherThread(final ZContext zContext)
272         {
273             this.zContext = zContext;
274         }
275 
276         /**
277          * Construct a new PublisherThread.
278          */
279         PublisherThread()
280         {
281             this.zContext = new ZContext(5);
282         }
283 
284         @Override
285         public void run()
286         {
287             try
288             {
289                 new Sim0MQPublisher(this.zContext, "publisherControl", "publisherOutput");
290             }
291             catch (SimRuntimeException e)
292             {
293                 e.printStackTrace();
294             }
295             System.out.println("Publisher thread exits");
296         }
297 
298     }
299 }