View Javadoc
1   package org.opentrafficsim.sim0mq.swing;
2   
3   import java.io.IOException;
4   import java.nio.file.Files;
5   import java.nio.file.Paths;
6   import java.util.ArrayList;
7   import java.util.Collections;
8   import java.util.List;
9   
10  import javax.naming.NamingException;
11  
12  import org.djunits.unit.DurationUnit;
13  import org.djunits.unit.TimeUnit;
14  import org.djunits.value.vdouble.scalar.Duration;
15  import org.djunits.value.vdouble.scalar.Time;
16  import org.djutils.serialization.SerializationException;
17  import org.sim0mq.Sim0MQException;
18  import org.sim0mq.message.Sim0MQMessage;
19  import org.zeromq.SocketType;
20  import org.zeromq.ZContext;
21  import org.zeromq.ZMQ;
22  
23  import nl.tudelft.simulation.dsol.SimRuntimeException;
24  import nl.tudelft.simulation.language.DsolException;
25  
26  /**
27   * Experiment with the Sim0MQPublisher.
28   * <p>
29   * Copyright (c) 2020-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
30   * BSD-style license. See <a href="https://opentrafficsim.org/docs/license.html">OpenTrafficSim License</a>.
31   * </p>
32   * @author <a href="https://github.com/averbraeck">Alexander Verbraeck</a>
33   * @author <a href="https://github.com/peter-knoppers">Peter Knoppers</a>
34   */
35  public final class PublisherDemo
36  {
37      /** Do not instantiate. */
38      private PublisherDemo()
39      {
40          // Do not instantiate
41      }
42  
43      /**
44       * Test code.
45       * @param args the command line arguments (not used)
46       * @throws IOException ...
47       * @throws NamingException ...
48       * @throws SimRuntimeException ...
49       * @throws DsolException ...
50       * @throws SerializationException ...
51       * @throws Sim0MQException ...
52       * @throws InterruptedException ...
53       */
54      public static void main(final String[] args) throws IOException, SimRuntimeException, NamingException, DsolException,
55              Sim0MQException, SerializationException, InterruptedException
56      {
57          ZContext zContext = new ZContext(5); // 5 IO threads - how many is reasonable? It actually works with 1 IO thread.
58  
59          List<byte[]> receivedMessages = new ArrayList<>();
60          List<byte[]> synchronizedReceivedMessages = Collections.synchronizedList(receivedMessages);
61          ReadMessageThread readMessageThread = new ReadMessageThread(zContext, synchronizedReceivedMessages);
62          readMessageThread.start();
63  
64          PublisherThread publisherThread = new PublisherThread(zContext);
65          publisherThread.start();
66  
67          ZMQ.Socket publisherControlSocket = zContext.createSocket(SocketType.PUSH);
68          publisherControlSocket.connect("inproc://publisherControl");
69  
70          int conversationId = 100; // Number the commands starting with something that is very different from 0
71          String badCommand = "THIS_IS_NOT_A_SUPPORTED_COMMAND";
72          sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", badCommand, conversationId++));
73          for (int attempt = 0; attempt < 100; attempt++)
74          {
75              if (receivedMessages.size() > 0)
76              {
77                  break;
78              }
79              Thread.sleep(100);
80          }
81          if (receivedMessages.size() == 0)
82          {
83              System.err.println("publisher does not respond");
84          }
85          else
86          {
87              Object[] objects = Sim0MQMessage.decodeToArray(receivedMessages.get(0));
88              if (!objects[5].equals(badCommand))
89              {
90                  System.err.println("publisher return unexpected response");
91              }
92              System.out.println("Got expected response to unsupported command");
93          }
94  
95          // FIXME: This is of course not the intention...
96          // FIXME: make the network available as a resource...
97          String xml = new String(Files
98                  .readAllBytes(Paths.get("C:/Users/pknoppers/Java/ots-demo/src/main/resources/TrafCODDemo2/TrafCODDemo2.xml")));
99          sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "NEWSIMULATION",
100                 conversationId++, xml, new Duration(3600, DurationUnit.SECOND), Duration.ZERO, 123456L));
101         sendCommand(publisherControlSocket,
102                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "|GET_CURRENT", conversationId++));
103 
104         sendCommand(publisherControlSocket,
105                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
106         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
107                 conversationId++, new Object[] {new Time(10, TimeUnit.BASE_SECOND)}));
108         sendCommand(publisherControlSocket,
109                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
110         int conversationIdForSubscribeToAdd = conversationId++; // We need that to unsubscribe later
111         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
112                 "GTUs in network|SUBSCRIBE_TO_ADD", conversationIdForSubscribeToAdd));
113         sendCommand(publisherControlSocket,
114                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|GET_RESULT_META_DATA", conversationId++));
115         int conversationIdForGTU2Move = conversationId++;
116         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTU move|SUBSCRIBE_TO_CHANGE",
117                 conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
118         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
119                 conversationId++, new Object[] {new Time(20, TimeUnit.BASE_SECOND)}));
120         sendCommand(publisherControlSocket,
121                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
122         // unsubscribe from GTU ADD events using saved conversationId
123         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
124                 "GTUs in network|UNSUBSCRIBE_FROM_ADD", conversationIdForSubscribeToAdd));
125         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
126                 "GTU move|UNSUBSCRIBE_FROM_CHANGE", conversationIdForGTU2Move, "2")); // Subscribe to move events of GTU 2
127         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "SIMULATEUNTIL",
128                 conversationId++, new Object[] {new Time(30, TimeUnit.BASE_SECOND)}));
129         sendCommand(publisherControlSocket,
130                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_CURRENT", conversationId++));
131         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave",
132                 "GTUs in network|GET_ADDRESS_META_DATA", conversationId++));
133         sendCommand(publisherControlSocket,
134                 Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "GTUs in network|GET_RESULT_META_DATA", conversationId++));
135         sendCommand(publisherControlSocket, Sim0MQMessage.encodeUTF8(true, 0, "Master", "Slave", "DIE", conversationId++));
136         System.out.println("Master has sent last command; Publisher should be busy for a while and then die");
137         System.out.println("Master joining publisher thread (this should block until publisher has died)");
138         publisherThread.join();
139         System.out.println("Master has joined publisher thread");
140         System.out.println("Master interrupts read message thread");
141         readMessageThread.interrupt();
142         System.out.println("Master has interrupted read message thread; joining ...");
143         readMessageThread.join();
144         System.out.println("Master has joined read message thread");
145         System.out.println("Master exits");
146     }
147 
148     /**
149      * Wrapper for ZMQ.Socket.send that may output some debugging information.
150      * @param socket ZMQ.Socket; the socket to send onto
151      * @param message the message to transmit
152      */
153     static void sendCommand(final ZMQ.Socket socket, final byte[] message)
154     {
155         try
156         {
157             Object[] unpackedMessage = Sim0MQMessage.decodeToArray(message);
158             System.out.println("Master sending command " + unpackedMessage[5] + " conversation id " + unpackedMessage[6]);
159         }
160         catch (Sim0MQException | SerializationException e)
161         {
162             e.printStackTrace();
163         }
164         socket.send(message);
165     }
166 
167     /**
168      * Repeatedly try to read all available messages.
169      */
170     static class ReadMessageThread extends Thread
171     {
172         /** The ZContext needed to create the socket. */
173         private final ZContext zContext;
174 
175         /** Storage for the received messages. */
176         private final List<byte[]> storage;
177 
178         /**
179          * Repeatedly read all available messages.
180          * @param zContext the ZContext needed to create the read socket
181          * @param storage storage for the received messages
182          */
183         ReadMessageThread(final ZContext zContext, final List<byte[]> storage)
184         {
185             this.zContext = zContext;
186             this.storage = storage;
187         }
188 
189         @Override
190         public void run()
191         {
192             System.out.println("Read message thread starting up");
193             ZMQ.Socket socket = this.zContext.createSocket(SocketType.PULL);
194             socket.setReceiveTimeOut(100);
195             socket.bind("inproc://publisherOutput");
196             while (!Thread.interrupted())
197             {
198                 byte[][] all = readMessages(socket);
199                 for (byte[] one : all)
200                 {
201                     this.storage.add(one);
202                 }
203             }
204             System.out.println("Read message thread exits due to interrupt");
205         }
206 
207     }
208 
209     /**
210      * Read as many messages from a ZMQ socket as are available. Do NOT block when there are no (more) messages to read.
211      * @param socket ZMQ.Socket; the socket
212      * @return the read messages
213      */
214     public static byte[][] readMessages(final ZMQ.Socket socket)
215     {
216         List<byte[]> resultList = new ArrayList<>();
217         while (true)
218         {
219             byte[] message = socket.recv();
220             StringBuilder output = new StringBuilder();
221             if (null != message)
222             {
223                 output.append("Master received " + message.length + " byte message: ");
224                 // System.out.println(SerialDataDumper.serialDataDumper(EndianUtil.BIG_ENDIAN, message));
225                 try
226                 {
227                     Object[] fields = Sim0MQMessage.decodeToArray(message);
228                     for (Object field : fields)
229                     {
230                         output.append("|" + field);
231                     }
232                     output.append("|");
233                 }
234                 catch (Sim0MQException | SerializationException e)
235                 {
236                     e.printStackTrace();
237                 }
238                 System.out.println(output);
239                 resultList.add(message);
240             }
241             else
242             {
243                 if (resultList.size() > 0)
244                 {
245                     System.out.println(
246                             "Master picked up " + resultList.size() + " message" + (resultList.size() == 1 ? "" : "s"));
247                 }
248                 break;
249             }
250         }
251         return resultList.toArray(new byte[resultList.size()][]);
252     }
253 
254     /**
255      * Thread that runs a PublisherExperiment.
256      */
257     static class PublisherThread extends Thread
258     {
259         /** Passed onto the constructor of PublisherExperimentUsingSockets. */
260         private final ZContext zContext;
261 
262         /**
263          * Construct a new PublisherThread.
264          * @param zContext needed to construct the PublisherExperimentUsingSockets
265          */
266         PublisherThread(final ZContext zContext)
267         {
268             this.zContext = zContext;
269         }
270 
271         /**
272          * Construct a new PublisherThread.
273          */
274         PublisherThread()
275         {
276             this.zContext = new ZContext(5);
277         }
278 
279         @Override
280         public void run()
281         {
282             try
283             {
284                 new Sim0mqPublisher(this.zContext, "publisherControl", "publisherOutput");
285             }
286             catch (SimRuntimeException e)
287             {
288                 e.printStackTrace();
289             }
290             System.out.println("Publisher thread exits");
291         }
292 
293     }
294 }