View Javadoc
1   package org.opentrafficsim.demo.web;
2   
3   import org.zeromq.ZContext;
4   import org.zeromq.ZMQ;
5   
6   /**
7    * The LoadBalancer start listening on the given port for messages. It is started with a map of network nodes where
8    * FederateStarters can start models, and a capacity of each node.<br>
9    * <br>
10   * The program is called as follows: java -jar LocalLoadBalancer.jar key1=value1 key2=value2 ... The following keys are defined:
11   * <ul>
12   * <li><b>port:</b> the port on which the LoadBalancer listens</li>
13   * <li><b>nodeFile:</b> the file that contains the node information</li>
14   * </ul>
15   * The nodeFile is expected to be tab-delimited with the following columns (and an example):
16   * 
17   * <pre>
18   * nodeAddress  fsPort   maxLoad   priority
19   * 10.0.0.121   5500     8         10
20   * 10.0.0.122   5500     8         10
21   * 10.0.0.123   5500     8         10
22   * 127.0.0.1    5500     8         5
23   * </pre>
24   * 
25   * Where nodeAddress is the network name of the node or its IP address, fsPort is the port where the FederateStarter can be
26   * found on the node, maxLoad is an int indicating the maximum number of jobs the node can handle, and priority indicates which
27   * nodes should be scheduled first (nodes of lower priority are only scheduled when high priority nodes are full). The example
28   * above shows three nodes with priority 10, and the localhost with a lower priority. So only when 24 federates are running,
29   * localhost will be used.
30   * <p>
31   * Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
32   * BSD-style license. See <a href="https://opentrafficsim.org/docs/current/license.html">OTS License</a>.
33   * </p>
34   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
35   */
36  public class SimpleLoadBalancer
37  {
38  //    /** the port number to listen on. */
39  //    private final int lbPort;
40  
41  //    /** the nodes with federate starters that can be used. */
42  //    private final Set<FederateStarterNode> federateStarterNodes;
43  
44      /** the 0mq socket. */
45      private ZMQ.Socket lbSocket;
46  
47      /** the 0mq context. */
48      private ZContext lbContext;
49  
50      /** message count. */
51      private long messageCount = 0;
52  
53  //    /**
54  //     * @param lbPort the port number to listen on
55  //     * @param federateStarterNodes the nodes with federate starters that can be used
56  //     * @throws Sim0MQException on error
57  //     * @throws SerializationException on error
58  //     */
59  //    public SimpleLoadBalancer(final int lbPort, final Set<FederateStarterNode> federateStarterNodes)
60  //            throws Sim0MQException, SerializationException
61  //    {
62  //        super();
63  //        this.lbPort = lbPort;
64  //        this.federateStarterNodes = federateStarterNodes;
65  //
66  //        this.lbContext = new ZContext(1);
67  //
68  //        this.lbSocket = this.lbContext.createSocket(SocketType.ROUTER);
69  //        this.lbSocket.bind("tcp://*:" + this.lbPort);
70  //
71  //        while (!Thread.currentThread().isInterrupted())
72  //        {
73  //            // Wait for next request from the [web] client -- first the identity (String) and the delimiter (#0)
74  //            String identity = this.lbSocket.recvStr();
75  //            this.lbSocket.recvStr();
76  //
77  //            byte[] request = this.lbSocket.recv(0);
78  //            Object[] fields = SimulationMessage.decode(request);
79  //            String messageTypeId = fields[4].toString();
80  //            String receiverId = fields[3].toString();
81  //
82  //            System.out.println("Received " + SimulationMessage.print(fields));
83  //
84  //            if (receiverId.equals("LB"))
85  //            {
86  //                switch (messageTypeId)
87  //                {
88  //                    case "LB.1":
89  //                        processStartModel(identity, fields);
90  //                        break;
91  //
92  //                    case "LB.2":
93  //                        processModelKilled(identity, fields);
94  //                        break;
95  //
96  //                    default:
97  //                        // wrong message
98  //                        System.err.println("Received unknown message -- not processed: " + messageTypeId);
99  //                }
100 //            }
101 //            else
102 //            {
103 //                // wrong receiver
104 //                System.err.println("Received message not intended for LB but for " + receiverId + " -- not processed: ");
105 //            }
106 //        }
107 //        this.lbSocket.close();
108 //        this.lbContext.destroy();
109 //    }
110 //
111 //    /**
112 //     * Process MC.X message and send LB.Y message back.
113 //     * @param identity reply id for REQ-ROUTER pattern
114 //     * @param fields the message
115 //     * @throws Sim0MQException on error
116 //     * @throws SerializationException on error
117 //     */
118 //    private void processStartFederateStarter(final String identity, final Object[] fields)
119 //            throws Sim0MQException, SerializationException
120 //    {
121 //        StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "LB");
122 //        String error = "";
123 //
124 //        int modelPort = findFreePortNumber();
125 //
126 //        if (modelPort == -1)
127 //        {
128 //            error = "No free port number";
129 //        }
130 //
131 //        else
132 //
133 //        {
134 //            try
135 //            {
136 //                ProcessBuilder pb = new ProcessBuilder();
137 //
138 //                Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
139 //                pb.directory(workingPath.toFile());
140 //
141 //                String softwareCode = "";
142 //                if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
143 //                {
144 //                    System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
145 //                            + " in software properties file");
146 //                }
147 //                else
148 //                {
149 //                    softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
150 //
151 //                    List<String> pbArgs = new ArrayList<>();
152 //                    pbArgs.add(softwareCode);
153 //                    pbArgs.add(startFederateMessage.getArgsBefore());
154 //                    pbArgs.add(startFederateMessage.getModelPath());
155 //                    pbArgs.addAll(Arrays.asList(
156 //                            startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
157 //                    pb.command(pbArgs);
158 //
159 //                    String stdIn = startFederateMessage.getRedirectStdin();
160 //                    String stdOut = startFederateMessage.getRedirectStdout();
161 //                    String stdErr = startFederateMessage.getRedirectStderr();
162 //
163 //                    if (stdIn.length() > 0)
164 //                    {
165 //                        // TODO working dir path if not absolute?
166 //                        File stdInFile = new File(stdIn);
167 //                        pb.redirectInput(stdInFile);
168 //                    }
169 //
170 //                    if (stdOut.length() > 0)
171 //                    {
172 //                        // TODO working dir path if not absolute?
173 //                        File stdOutFile = new File(stdOut);
174 //                        pb.redirectOutput(stdOutFile);
175 //                    }
176 //
177 //                    if (stdErr.length() > 0)
178 //                    {
179 //                        // TODO working dir path if not absolute?
180 //                        File stdErrFile = new File(stdErr);
181 //                        pb.redirectError(stdErrFile);
182 //                    }
183 //
184 //                    new Thread()
185 //                    {
186 //                        /** {@inheritDoc} */
187 //                        @Override
188 //                        public void run()
189 //                        {
190 //                            try
191 //                            {
192 //                                Process process = pb.start();
193 //                                SimpleLoadBalancer.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
194 //                                System.err.println("Process started:" + process.isAlive());
195 //                            }
196 //                            catch (IOException exception)
197 //                            {
198 //                                exception.printStackTrace();
199 //                            }
200 //                        }
201 //                    }.start();
202 //
203 //                    this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
204 //                    this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
205 //
206 //                    // Thread.sleep(1000);
207 //
208 //                    // wait till the model is ready...
209 //                    error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
210 //                            modelPort);
211 //                }
212 //            }
213 //            catch (IOException exception)
214 //            {
215 //                exception.printStackTrace();
216 //                error = exception.getMessage();
217 //            }
218 //        }
219 //
220 //        System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
221 //
222 //        // Send reply back to client
223 //        this.lbSocket.sendMore(identity);
224 //        this.lbSocket.sendMore("");
225 //        //@formatter:off
226 //        byte[] fs2Message = new FederateStartedMessage.Builder()
227 //                .setSimulationRunId(startFederateMessage.getSimulationRunId())
228 //                .setInstanceId(startFederateMessage.getInstanceId())
229 //                .setSenderId("FS")
230 //                .setReceiverId(startFederateMessage.getSenderId())
231 //                .setMessageId(++this.messageCount)
232 //                .setStatus(error.isEmpty() ? "started" : "error")
233 //                .setError(error)
234 //                .setModelPort(modelPort)
235 //                .build()
236 //                .createByteArray();
237 //        this.lbSocket.send(fs2Message, 0);
238 //        //@formatter:on
239 //    }
240 //
241 //    /**
242 //     * Find a free port for the model.
243 //     * @return the first free fort number in the range startPort - endPort, inclusive
244 //     */
245 //    private int findFreePortNumber()
246 //    {
247 //        for (int port = this.startPort; port <= this.endPort; port++)
248 //        {
249 //            if (!this.modelPortMap.containsValue(port))
250 //            {
251 //                // try if the port is really free
252 //                ZMQ.Socket testSocket = null;
253 //                try
254 //                {
255 //                    testSocket = this.lbContext.createSocket(SocketType.REP);
256 //                    testSocket.bind("tcp://127.0.0.1:" + port);
257 //                    testSocket.unbind("tcp://127.0.0.1:" + port);
258 //                    testSocket.close();
259 //                    return port;
260 //                }
261 //                catch (Exception exception)
262 //                {
263 //                    // port was not free
264 //                    if (testSocket != null)
265 //                    {
266 //                        try
267 //                        {
268 //                            testSocket.close();
269 //                        }
270 //                        catch (Exception e)
271 //                        {
272 //                            // ignore.
273 //                        }
274 //                    }
275 //                }
276 //            }
277 //        }
278 //        return -1;
279 //    }
280 //
281 //    /**
282 //     * Wait for simulation to end using status polling with message FM.5.
283 //     * @param federationRunId the name of the federation
284 //     * @param modelId the String id of the model
285 //     * @param modelPort port on which the model is listening
286 //     * @return empty String for no error, filled String for error
287 //     * @throws Sim0MQException on error
288 //     * @throws SerializationException on error
289 //     */
290 //    private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
291 //            throws Sim0MQException, SerializationException
292 //    {
293 //        boolean ok = true;
294 //        String error = "";
295 //        ZMQ.Socket modelSocket = null;
296 //        try
297 //        {
298 //            modelSocket = this.lbContext.createSocket(SocketType.REQ);
299 //            modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
300 //            modelSocket.connect("tcp://127.0.0.1:" + modelPort);
301 //        }
302 //        catch (Exception exception)
303 //        {
304 //            exception.printStackTrace();
305 //            ok = false;
306 //            error = exception.getMessage();
307 //        }
308 //
309 //        boolean started = false;
310 //        while (ok && !started)
311 //        {
312 //            byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
313 //                    MessageStatus.NEW);
314 //            modelSocket.send(fs1Message, 0);
315 //
316 //            byte[] reply = modelSocket.recv(0);
317 //            Object[] replyMessage = SimulationMessage.decode(reply);
318 //            System.out.println("Received\n" + SimulationMessage.print(replyMessage));
319 //
320 //            if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
321 //                    && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
322 //            {
323 //                if (replyMessage[9].toString().equals("started"))
324 //                {
325 //                    started = true;
326 //                }
327 //                else
328 //                {
329 //                    // wait a second
330 //                    try
331 //                    {
332 //                        Thread.sleep(100);
333 //                    }
334 //                    catch (InterruptedException ie)
335 //                    {
336 //                        // ignore
337 //                    }
338 //                }
339 //            }
340 //            else
341 //            {
342 //                ok = false;
343 //                error = replyMessage[10].toString();
344 //                System.err.println("Simulation start error -- status = " + replyMessage[9]);
345 //                System.err.println("Error message = " + replyMessage[10]);
346 //            }
347 //        }
348 //
349 //        if (modelSocket != null)
350 //        {
351 //            modelSocket.close();
352 //        }
353 //
354 //        return error;
355 //    }
356 //
357 //    /**
358 //     * Process FM.8 message and send FS.4 message back.
359 //     * @param identity reply id for REQ-ROUTER pattern
360 //     * @param fields the message
361 //     * @throws Sim0MQException on error
362 //     * @throws SerializationException on error
363 //     */
364 //    private void processKillFederateStarter(final String identity, final Object[] fields)
365 //            throws Sim0MQException, SerializationException
366 //    {
367 //        boolean status = true;
368 //        String error = "";
369 //
370 //        Object federationRunId = fields[1];
371 //        String senderId = fields[2].toString();
372 //
373 //        String modelId = fields[8].toString();
374 //        if (!this.modelPortMap.containsKey(modelId))
375 //        {
376 //            status = false;
377 //            error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
378 //        }
379 //        else
380 //        {
381 //            int modelPort = this.modelPortMap.remove(modelId);
382 //            Process process = this.runningProcessMap.remove(modelId);
383 //
384 //            try
385 //            {
386 //                try
387 //                {
388 //                    ZMQ.Socket modelSocket = this.lbContext.createSocket(SocketType.REQ);
389 //                    modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
390 //                    modelSocket.connect("tcp://127.0.0.1:" + modelPort);
391 //
392 //                    byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
393 //                            ++this.messageCount, MessageStatus.NEW);
394 //                    modelSocket.send(fs3Message, 0);
395 //
396 //                    modelSocket.close();
397 //                }
398 //                catch (Exception exception)
399 //                {
400 //                    exception.printStackTrace();
401 //                    status = true;
402 //                    error = exception.getMessage();
403 //                }
404 //
405 //                try
406 //                {
407 //                    Thread.sleep(100);
408 //                }
409 //                catch (InterruptedException ie)
410 //                {
411 //                    // ignore
412 //                }
413 //
414 //                if (process != null && process.isAlive())
415 //                {
416 //                    process.destroyForcibly();
417 //                }
418 //
419 //                StartFederateMessage sfm = this.startFederateMessages.get(modelId);
420 //                if (sfm.isDeleteStdout())
421 //                {
422 //                    if (sfm.getRedirectStdout().length() > 0)
423 //                    {
424 //                        File stdOutFile = new File(sfm.getRedirectStdout());
425 //                        stdOutFile.delete();
426 //                    }
427 //                }
428 //
429 //                if (sfm.isDeleteStderr())
430 //                {
431 //                    if (sfm.getRedirectStderr().length() > 0)
432 //                    {
433 //                        File stdErrFile = new File(sfm.getRedirectStderr());
434 //                        stdErrFile.delete();
435 //                    }
436 //                }
437 //
438 //                if (sfm.isDeleteWorkingDirectory())
439 //                {
440 //                    File workingDir = new File(sfm.getWorkingDirectory());
441 //                    workingDir.delete();
442 //                }
443 //            }
444 //            catch (Exception exception)
445 //            {
446 //                exception.printStackTrace();
447 //                status = false;
448 //                error = exception.getMessage();
449 //            }
450 //
451 //            byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
452 //                    MessageStatus.NEW, modelId, status, error);
453 //            this.lbSocket.sendMore(identity);
454 //            this.lbSocket.sendMore("");
455 //            this.lbSocket.send(fs4Message, 0);
456 //        }
457 //    }
458 //
459 //    /**
460 //     * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
461 //     * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
462 //     * @param args the federation name and port on which the FederateStarter is listening
463 //     * @throws Sim0MQException on error
464 //     * @throws SerializationException on error
465 //     */
466 //    public static void main(final String[] args) throws Sim0MQException, SerializationException
467 //    {
468 //        if (args.length < 4)
469 //        {
470 //            System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
471 //            System.exit(-1);
472 //        }
473 //
474 //        String sPort = args[0];
475 //        int port = 0;
476 //        try
477 //        {
478 //            port = Integer.parseInt(sPort);
479 //        }
480 //        catch (NumberFormatException nfe)
481 //        {
482 //            System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
483 //            System.exit(-1);
484 //        }
485 //        if (port == 0 || port > 65535)
486 //        {
487 //            System.err.println("PortNumber should be between 1 and 65535");
488 //            System.exit(-1);
489 //        }
490 //
491 //        String propertiesFile = args[1];
492 //        Properties softwareProperties = new Properties();
493 //        InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
494 //        try
495 //        {
496 //            softwareProperties.load(propertiesStream);
497 //        }
498 //        catch (IOException | NullPointerException e)
499 //        {
500 //            System.err.println("Could not find or read software properties file " + propertiesFile);
501 //            System.exit(-1);
502 //        }
503 //
504 //        String sStartPort = args[2];
505 //        int startPort = 0;
506 //        try
507 //        {
508 //            startPort = Integer.parseInt(sStartPort);
509 //        }
510 //        catch (NumberFormatException nfe)
511 //        {
512 //            System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
513 //            System.exit(-1);
514 //        }
515 //        if (startPort == 0 || startPort > 65535)
516 //        {
517 //            System.err.println("startPort should be between 1 and 65535");
518 //            System.exit(-1);
519 //        }
520 //
521 //        String sEndPort = args[3];
522 //        int endPort = 0;
523 //        try
524 //        {
525 //            endPort = Integer.parseInt(sEndPort);
526 //        }
527 //        catch (NumberFormatException nfe)
528 //        {
529 //            System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
530 //            System.exit(-1);
531 //        }
532 //        if (endPort == 0 || endPort > 65535)
533 //        {
534 //            System.err.println("endPort should be between 1 and 65535");
535 //            System.exit(-1);
536 //        }
537 //
538 //        new SimpleLoadBalancer(port, softwareProperties, startPort, endPort);
539 //    }
540 //
541 //    /** Record with information about the nodes that have a FederateStarter running. */
542 //    static class FederateStarterNode implements Comparable<FederateStarterNode>
543 //    {
544 //        /** the node name or IP address where the Federate Starter resides. */
545 //        private final String fsNodeName;
546 //
547 //        /** the port of the Federate Starter. */
548 //        private final int port;
549 //
550 //        /** the maximum load on the node (e.g., the maximum number of concurrent models). */
551 //        private final int maxLoad;
552 //
553 //        /** the current load on the node. */
554 //        private int currentLoad = 0;
555 //
556 //        /** the priority of the node. Higher value is higher priority. */
557 //        private int priority;
558 //
559 //        /**
560 //         * @param fsNodeName String; the node name or IP address where the Federate Starter resides
561 //         * @param port int; the port of the Federate Starter
562 //         * @param maxLoad int; the maximum load on the node (e.g., the maximum number of concurrent models)
563 //         * @param priority int; the priority of the node. Higher value is higher priority.
564 //         */
565 //        FederateStarterNode(final String fsNodeName, final int port, final int maxLoad, final int priority)
566 //        {
567 //            super();
568 //            this.fsNodeName = fsNodeName;
569 //            this.port = port;
570 //            this.maxLoad = maxLoad;
571 //            this.priority = priority;
572 //        }
573 //
574 //        /**
575 //         * @return currentLoad
576 //         */
577 //        public final int getCurrentLoad()
578 //        {
579 //            return this.currentLoad;
580 //        }
581 //
582 //        /**
583 //         * @param currentLoad set currentLoad
584 //         */
585 //        public final void setCurrentLoad(final int currentLoad)
586 //        {
587 //            this.currentLoad = currentLoad;
588 //        }
589 //
590 //        /**
591 //         * @return priority
592 //         */
593 //        public final int getPriority()
594 //        {
595 //            return this.priority;
596 //        }
597 //
598 //        /**
599 //         * @param priority set priority
600 //         */
601 //        public final void setPriority(final int priority)
602 //        {
603 //            this.priority = priority;
604 //        }
605 //
606 //        /**
607 //         * @return fsNodeName
608 //         */
609 //        public final String getFsNodeName()
610 //        {
611 //            return this.fsNodeName;
612 //        }
613 //
614 //        /**
615 //         * @return port
616 //         */
617 //        public final int getPort()
618 //        {
619 //            return this.port;
620 //        }
621 //
622 //        /**
623 //         * @return maxLoad
624 //         */
625 //        public final int getMaxLoad()
626 //        {
627 //            return this.maxLoad;
628 //        }
629 //
630 //        /** {@inheritDoc} */
631 //        @Override
632 //        public int hashCode()
633 //        {
634 //            final int prime = 31;
635 //            int result = 1;
636 //            result = prime * result + ((this.fsNodeName == null) ? 0 : this.fsNodeName.hashCode());
637 //            result = prime * result + this.port;
638 //            return result;
639 //        }
640 //
641 //        /** {@inheritDoc} */
642 //        @SuppressWarnings("checkstyle:needbraces")
643 //        @Override
644 //        public boolean equals(final Object obj)
645 //        {
646 //            if (this == obj)
647 //                return true;
648 //            if (obj == null)
649 //                return false;
650 //            if (getClass() != obj.getClass())
651 //                return false;
652 //            FederateStarterNode other = (FederateStarterNode) obj;
653 //            if (this.fsNodeName == null)
654 //            {
655 //                if (other.fsNodeName != null)
656 //                    return false;
657 //            }
658 //            else if (!this.fsNodeName.equals(other.fsNodeName))
659 //                return false;
660 //            if (this.port != other.port)
661 //                return false;
662 //            return true;
663 //        }
664 //
665 //        /** {@inheritDoc} */
666 //        @SuppressWarnings("checkstyle:needbraces")
667 //        @Override
668 //        public int compareTo(final FederateStarterNode o)
669 //        {
670 //            // Compares this object with the specified object for order. Returns a
671 //            // negative integer, zero, or a positive integer as this object is less
672 //            // than, equal to, or greater than the specified object.
673 //            if (this.equals(o))
674 //                return 0;
675 //
676 //            // higher priority number means higher priority
677 //            if (this.priority > o.priority)
678 //                return 1;
679 //            if (this.priority < o.priority)
680 //                return -1;
681 //
682 //            // higher remaining load means higher priority
683 //            if (this.maxLoad - this.currentLoad > o.maxLoad - o.currentLoad)
684 //                return 1;
685 //            if (this.maxLoad - this.currentLoad < o.maxLoad - o.currentLoad)
686 //                return -1;
687 //
688 //            if (this.hashCode() > o.hashCode())
689 //                return 1;
690 //            if (this.hashCode() < o.hashCode())
691 //                return -1;
692 //            //
693 //            return 0;
694 //        }
695 //    }
696 }