View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.io.IOException;
4   import java.nio.file.Files;
5   import java.nio.file.Paths;
6   import java.util.ArrayList;
7   import java.util.Collections;
8   import java.util.List;
9   
10  import javax.naming.NamingException;
11  
12  import org.djunits.unit.DurationUnit;
13  import org.djunits.unit.TimeUnit;
14  import org.djunits.value.vdouble.scalar.Duration;
15  import org.djunits.value.vdouble.scalar.Time;
16  import org.djutils.serialization.SerializationException;
17  import org.opentrafficsim.draw.core.OTSDrawingException;
18  import org.sim0mq.Sim0MQException;
19  import org.sim0mq.message.Sim0MQMessage;
20  import org.zeromq.SocketType;
21  import org.zeromq.ZContext;
22  import org.zeromq.ZMQ;
23  
24  import nl.tudelft.simulation.dsol.SimRuntimeException;
25  import nl.tudelft.simulation.language.DSOLException;
26  
27  /**
28   * Experiment with the Sim0MQPublisher.
29   * <p>
30   * Copyright (c) 2020-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
31   * BSD-style license. See <a href="http://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
32   * <p>
33   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
34   * @author <a href="http://www.tudelft.nl/pknoppers">Peter Knoppers</a>
35   */
36  public final class PublisherDemo
37  {
38      /** Do not instantiate. */
39      private PublisherDemo()
40      {
41          // Do not instantiate
42      }
43  
44      /**
45       * Test code.
46       * @param args String[]; the command line arguments (not used)
47       * @throws IOException ...
48       * @throws NamingException ...
49       * @throws SimRuntimeException ...
50       * @throws DSOLException ...
51       * @throws OTSDrawingException ...
52       * @throws SerializationException ...
53       * @throws Sim0MQException ...
54       * @throws InterruptedException ...
55       */
56      public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DSOLException,
57              OTSDrawingException, Sim0MQException, SerializationException, InterruptedException
58      {
59          ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
60  
61          List<byte[]> receivedMessages = new ArrayList<>();
62          List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
63          ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
64          readMessageThread.start();
65  
66          PublisherThread publisherThread = new PublisherThread(zContext);
67          publisherThread.start();
68  
69          ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
70          publisherControlSocket.connect("inproc://publisherControl");
71  
72          int conversationId = 100; // Number the commands starting with something that is very different from 0
73          String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
74          sendCommand(publisherControlSocket,
75                  Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
76          for (int attempt = 0; attempt < 100; attempt++)
77          {
78              if (receivedMessages.size() > 0)
79              {
80                  break;
81              }
82              Thread.sleep(100);
83          }
84          if (receivedMessages.size() == 0)
85          {
86              System.err.println("publisher does not respond");
87          }
88          else
89          {
90              Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
91              if (!objects[5].equals(badCommand))
92              {
93                  System.err.println("publisher return unexpected response");
94              }
95              System.out.println("Got expected response to unsupported command");
96          }
97          
98          String xml = new String(Files
99                  .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
100         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
101                 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
102         sendCommand(publisherControlSocket,
103                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
104         
105         
106         
107         sendCommand(publisherControlSocket,
108                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
109         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
110                 conversationId++, new Object[] { new Time(10, TimeUnit.BASE_SECOND) }));
111         sendCommand(publisherControlSocket,
112                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
113         int conversationIdForSubscribeToAdd = conversationId++; // We need that to unsubscribe later
114         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
115                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
116         sendCommand(publisherControlSocket,
117                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
118         int conversationIdForGTU2Move = conversationId++;
119         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
120                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
121         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
122                 conversationId++, new Object[] { new Time(20, TimeUnit.BASE_SECOND) }));
123         sendCommand(publisherControlSocket,
124                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
125         // unsubscribe from GTU ADD events using saved conversationId
126         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
127                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
128         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
129                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
130         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
131                 conversationId++, new Object[] { new Time(30, TimeUnit.BASE_SECOND) }));
132         sendCommand(publisherControlSocket,
133                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
134         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
135                 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
136         sendCommand(publisherControlSocket,
137                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
138         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
139         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
140         System.out.println("Master joining publisher thread (this should block until publisher has died)");
141         publisherThread.join();
142         System.out.println("Master has joined publisher thread");
143         System.out.println("Master interrupts read message thread");
144         readMessageThread.interrupt();
145         System.out.println("Master has interrupted read message thread; joining ...");
146         readMessageThread.join();
147         System.out.println("Master has joined read message thread");
148         System.out.println("Master exits");
149     }
150 
151     /**
152      * Wrapper for ZMQ.Socket.send that may output some debugging information.
153      * @param socket ZMQ.Socket; the socket to send onto
154      * @param message byte[]; the message to transmit
155      */
156     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
157     {
158         try
159         {
160             Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
161             System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
162         }
163         catch (Sim0MQException | SerializationException e)
164         {
165             e.printStackTrace();
166         }
167         socket.send(message);
168     }
169 
170     /**
171      * Repeatedly try to read all available messages.
172      */
173     static class ReadMessageThread extends Thread
174     {
175         /** The ZContext needed to create the socket. */
176         private final ZContext zContext;
177         
178         /** Storage for the received messages. */
179         private final List<byte[]> storage;
180 
181         /**
182          * Repeatedly read all available messages.
183          * @param zContext ZContext; the ZContext needed to create the read socket
184          * @param storage List&lt;String&gt;; storage for the received messages
185          */
186         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
187         {
188             this.zContext = zContext;
189             this.storage = storage;
190         }
191 
192         @Override
193         public void run()
194         {
195             System.out.println("Read message thread starting up");
196             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
197             socket.setReceiveTimeOut(100);
198             socket.bind("inproc://publisherOutput");
199             while (!Thread.interrupted())
200             {
201                 byte[][] all = readMessages(socket);
202                 for (byte[] one : all)
203                 {
204                     this.storage.add(one);
205                 }
206             }
207             System.out.println("Read message thread exits due to interrupt");
208         }
209 
210     }
211 
212     /**
213      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
214      * @param socket ZMQ.Socket; the socket
215      * @return byte[][]; the read messages
216      */
217     public static byte[][] readMessages(final ZMQ.Socket socket)
218     {
219         List<byte[]> resultList = new ArrayList<>();
220         while (true)
221         {
222             byte[] message = socket.recv();
223             StringBuilder output = new StringBuilder();
224             if (null != message)
225             {
226                 output.append("Master received " + message.length + " byte message: ");
227                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
228                 try
229                 {
230                     Object[] fields = Sim0MQMessage.decodeToArray(message);
231                     for (Object field : fields)
232                     {
233                         output.append("|" + field);
234                     }
235                     output.append("|");
236                 }
237                 catch (Sim0MQException | SerializationException e)
238                 {
239                     e.printStackTrace();
240                 }
241                 System.out.println(output);
242                 resultList.add(message);
243             }
244             else
245             {
246                 if (resultList.size() > 0)
247                 {
248                     System.out.println(
249                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
250                 }
251                 break;
252             }
253         }
254         return resultList.toArray(new byte[resultList.size()][]);
255     }
256 
257     /**
258      * Thread that runs a PublisherExperiment.
259      */
260     static class PublisherThread extends Thread
261     {
262         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
263         private final ZContext zContext;
264 
265         /**
266          * Construct a new PublisherThread.
267          * @param zContext ZContext; needed to construct the PublisherExperimentUsingSockets
268          */
269         PublisherThread(final ZContext zContext)
270         {
271             this.zContext = zContext;
272         }
273 
274         /**
275          * Construct a new PublisherThread.
276          */
277         PublisherThread()
278         {
279             this.zContext = new ZContext(5);
280         }
281 
282         @Override
283         public void run()
284         {
285             try
286             {
287                 new Sim0MQPublisher(zContext, "publisherControl", "publisherOutput");
288             }
289             catch (SimRuntimeException e)
290             {
291                 e.printStackTrace();
292             }
293             System.out.println("Publisher thread exits");
294         }
295 
296     }
297 }