View Javadoc
1   package org.opentrafficsim.demo.web;
2   
3   import org.zeromq.ZContext;
4   import org.zeromq.ZMQ;
5   
6   /**
7    * The LoadBalancer start listening on the given port for messages. It is started with a map of network nodes where
8    * FederateStarters can start models, and a capacity of each node.<br>
9    * <br>
10   * The program is called as follows: java -jar LocalLoadBalancer.jar key1=value1 key2=value2 ... The following keys are defined:
11   * <ul>
12   * <li><b>port:</b> the port on which the LoadBalancer listens</li>
13   * <li><b>nodeFile:</b> the file that contains the node information</li>
14   * </ul>
15   * The nodeFile is expected to be tab-delimited with the following columns (and an example):
16   * 
17   * <pre>
18   * nodeAddress  fsPort   maxLoad   priority
19   * 10.0.0.121   5500     8         10
20   * 10.0.0.122   5500     8         10
21   * 10.0.0.123   5500     8         10
22   * 127.0.0.1    5500     8         5
23   * </pre>
24   * 
25   * Where nodeAddress is the network name of the node or its IP address, fsPort is the port where the FederateStarter can be
26   * found on the node, maxLoad is an int indicating the maximum number of jobs the node can handle, and priority indicates which
27   * nodes should be scheduled first (nodes of lower priority are only scheduled when high priority nodes are full). The example
28   * above shows three nodes with priority 10, and the localhost with a lower priority. So only when 24 federates are running,
29   * localhost will be used.
30   * <p>
31   * Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
32   * BSD-style license. See <a href="https://opentrafficsim.org/docs/current/license.html">OTS License</a>.
33   * </p>
34   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
35   */
36  public class SimpleLoadBalancer
37  {
38  //    /** the port number to listen on. */
39  //    private final int lbPort;
40  
41  //    /** the nodes with federate starters that can be used. */
42  //    private final Set<FederateStarterNode> federateStarterNodes;
43  
44      /** the 0mq socket. */
45      private ZMQ.Socket lbSocket;
46  
47      /** the 0mq context. */
48      private ZContext lbContext;
49  
50      /** message count. */
51      private long messageCount = 0;
52  
53  //    /**
54  //     * @param lbPort the port number to listen on
55  //     * @param federateStarterNodes the nodes with federate starters that can be used
56  //     * @throws Sim0MQException on error
57  //     * @throws SerializationException on error
58  //     */
59  //    public SimpleLoadBalancer(final int lbPort, final Set<FederateStarterNode> federateStarterNodes)
60  //            throws Sim0MQException, SerializationException
61  //    {
62  //        this.lbPort = lbPort;
63  //        this.federateStarterNodes = federateStarterNodes;
64  //
65  //        this.lbContext = new ZContext(1);
66  //
67  //        this.lbSocket = this.lbContext.createSocket(SocketType.ROUTER);
68  //        this.lbSocket.bind("tcp://*:" + this.lbPort);
69  //
70  //        while (!Thread.currentThread().isInterrupted())
71  //        {
72  //            // Wait for next request from the [web] client -- first the identity (String) and the delimiter (#0)
73  //            String identity = this.lbSocket.recvStr();
74  //            this.lbSocket.recvStr();
75  //
76  //            byte[] request = this.lbSocket.recv(0);
77  //            Object[] fields = SimulationMessage.decode(request);
78  //            String messageTypeId = fields[4].toString();
79  //            String receiverId = fields[3].toString();
80  //
81  //            System.out.println("Received " + SimulationMessage.print(fields));
82  //
83  //            if (receiverId.equals("LB"))
84  //            {
85  //                switch (messageTypeId)
86  //                {
87  //                    case "LB.1":
88  //                        processStartModel(identity, fields);
89  //                        break;
90  //
91  //                    case "LB.2":
92  //                        processModelKilled(identity, fields);
93  //                        break;
94  //
95  //                    default:
96  //                        // wrong message
97  //                        System.err.println("Received unknown message -- not processed: " + messageTypeId);
98  //                }
99  //            }
100 //            else
101 //            {
102 //                // wrong receiver
103 //                System.err.println("Received message not intended for LB but for " + receiverId + " -- not processed: ");
104 //            }
105 //        }
106 //        this.lbSocket.close();
107 //        this.lbContext.destroy();
108 //    }
109 //
110 //    /**
111 //     * Process MC.X message and send LB.Y message back.
112 //     * @param identity reply id for REQ-ROUTER pattern
113 //     * @param fields the message
114 //     * @throws Sim0MQException on error
115 //     * @throws SerializationException on error
116 //     */
117 //    private void processStartFederateStarter(final String identity, final Object[] fields)
118 //            throws Sim0MQException, SerializationException
119 //    {
120 //        StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "LB");
121 //        String error = "";
122 //
123 //        int modelPort = findFreePortNumber();
124 //
125 //        if (modelPort == -1)
126 //        {
127 //            error = "No free port number";
128 //        }
129 //
130 //        else
131 //
132 //        {
133 //            try
134 //            {
135 //                ProcessBuilder pb = new ProcessBuilder();
136 //
137 //                Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
138 //                pb.directory(workingPath.toFile());
139 //
140 //                String softwareCode = "";
141 //                if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
142 //                {
143 //                    System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
144 //                            + " in software properties file");
145 //                }
146 //                else
147 //                {
148 //                    softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
149 //
150 //                    List<String> pbArgs = new ArrayList<>();
151 //                    pbArgs.add(softwareCode);
152 //                    pbArgs.add(startFederateMessage.getArgsBefore());
153 //                    pbArgs.add(startFederateMessage.getModelPath());
154 //                    pbArgs.addAll(Arrays.asList(
155 //                            startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
156 //                    pb.command(pbArgs);
157 //
158 //                    String stdIn = startFederateMessage.getRedirectStdin();
159 //                    String stdOut = startFederateMessage.getRedirectStdout();
160 //                    String stdErr = startFederateMessage.getRedirectStderr();
161 //
162 //                    if (stdIn.length() > 0)
163 //                    {
164 //                        // TODO working dir path if not absolute?
165 //                        File stdInFile = new File(stdIn);
166 //                        pb.redirectInput(stdInFile);
167 //                    }
168 //
169 //                    if (stdOut.length() > 0)
170 //                    {
171 //                        // TODO working dir path if not absolute?
172 //                        File stdOutFile = new File(stdOut);
173 //                        pb.redirectOutput(stdOutFile);
174 //                    }
175 //
176 //                    if (stdErr.length() > 0)
177 //                    {
178 //                        // TODO working dir path if not absolute?
179 //                        File stdErrFile = new File(stdErr);
180 //                        pb.redirectError(stdErrFile);
181 //                    }
182 //
183 //                    new Thread()
184 //                    {
185 //                        /** {@inheritDoc} */
186 //                        @Override
187 //                        public void run()
188 //                        {
189 //                            try
190 //                            {
191 //                                Process process = pb.start();
192 //                                SimpleLoadBalancer.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
193 //                                System.err.println("Process started:" + process.isAlive());
194 //                            }
195 //                            catch (IOException exception)
196 //                            {
197 //                                exception.printStackTrace();
198 //                            }
199 //                        }
200 //                    }.start();
201 //
202 //                    this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
203 //                    this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
204 //
205 //                    // Thread.sleep(1000);
206 //
207 //                    // wait till the model is ready...
208 //                    error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
209 //                            modelPort);
210 //                }
211 //            }
212 //            catch (IOException exception)
213 //            {
214 //                exception.printStackTrace();
215 //                error = exception.getMessage();
216 //            }
217 //        }
218 //
219 //        System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
220 //
221 //        // Send reply back to client
222 //        this.lbSocket.sendMore(identity);
223 //        this.lbSocket.sendMore("");
224 //        //@formatter:off
225 //        byte[] fs2Message = new FederateStartedMessage.Builder()
226 //                .setSimulationRunId(startFederateMessage.getSimulationRunId())
227 //                .setInstanceId(startFederateMessage.getInstanceId())
228 //                .setSenderId("FS")
229 //                .setReceiverId(startFederateMessage.getSenderId())
230 //                .setMessageId(++this.messageCount)
231 //                .setStatus(error.isEmpty() ? "started" : "error")
232 //                .setError(error)
233 //                .setModelPort(modelPort)
234 //                .build()
235 //                .createByteArray();
236 //        this.lbSocket.send(fs2Message, 0);
237 //        //@formatter:on
238 //    }
239 //
240 //    /**
241 //     * Find a free port for the model.
242 //     * @return the first free fort number in the range startPort - endPort, inclusive
243 //     */
244 //    private int findFreePortNumber()
245 //    {
246 //        for (int port = this.startPort; port <= this.endPort; port++)
247 //        {
248 //            if (!this.modelPortMap.containsValue(port))
249 //            {
250 //                // try if the port is really free
251 //                ZMQ.Socket testSocket = null;
252 //                try
253 //                {
254 //                    testSocket = this.lbContext.createSocket(SocketType.REP);
255 //                    testSocket.bind("tcp://127.0.0.1:" + port);
256 //                    testSocket.unbind("tcp://127.0.0.1:" + port);
257 //                    testSocket.close();
258 //                    return port;
259 //                }
260 //                catch (Exception exception)
261 //                {
262 //                    // port was not free
263 //                    if (testSocket != null)
264 //                    {
265 //                        try
266 //                        {
267 //                            testSocket.close();
268 //                        }
269 //                        catch (Exception e)
270 //                        {
271 //                            // ignore.
272 //                        }
273 //                    }
274 //                }
275 //            }
276 //        }
277 //        return -1;
278 //    }
279 //
280 //    /**
281 //     * Wait for simulation to end using status polling with message FM.5.
282 //     * @param federationRunId the name of the federation
283 //     * @param modelId the String id of the model
284 //     * @param modelPort port on which the model is listening
285 //     * @return empty String for no error, filled String for error
286 //     * @throws Sim0MQException on error
287 //     * @throws SerializationException on error
288 //     */
289 //    private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
290 //            throws Sim0MQException, SerializationException
291 //    {
292 //        boolean ok = true;
293 //        String error = "";
294 //        ZMQ.Socket modelSocket = null;
295 //        try
296 //        {
297 //            modelSocket = this.lbContext.createSocket(SocketType.REQ);
298 //            modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
299 //            modelSocket.connect("tcp://127.0.0.1:" + modelPort);
300 //        }
301 //        catch (Exception exception)
302 //        {
303 //            exception.printStackTrace();
304 //            ok = false;
305 //            error = exception.getMessage();
306 //        }
307 //
308 //        boolean started = false;
309 //        while (ok && !started)
310 //        {
311 //            byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
312 //                    MessageStatus.NEW);
313 //            modelSocket.send(fs1Message, 0);
314 //
315 //            byte[] reply = modelSocket.recv(0);
316 //            Object[] replyMessage = SimulationMessage.decode(reply);
317 //            System.out.println("Received\n" + SimulationMessage.print(replyMessage));
318 //
319 //            if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
320 //                    && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
321 //            {
322 //                if (replyMessage[9].toString().equals("started"))
323 //                {
324 //                    started = true;
325 //                }
326 //                else
327 //                {
328 //                    // wait a second
329 //                    try
330 //                    {
331 //                        Thread.sleep(100);
332 //                    }
333 //                    catch (InterruptedException ie)
334 //                    {
335 //                        // ignore
336 //                    }
337 //                }
338 //            }
339 //            else
340 //            {
341 //                ok = false;
342 //                error = replyMessage[10].toString();
343 //                System.err.println("Simulation start error -- status = " + replyMessage[9]);
344 //                System.err.println("Error message = " + replyMessage[10]);
345 //            }
346 //        }
347 //
348 //        if (modelSocket != null)
349 //        {
350 //            modelSocket.close();
351 //        }
352 //
353 //        return error;
354 //    }
355 //
356 //    /**
357 //     * Process FM.8 message and send FS.4 message back.
358 //     * @param identity reply id for REQ-ROUTER pattern
359 //     * @param fields the message
360 //     * @throws Sim0MQException on error
361 //     * @throws SerializationException on error
362 //     */
363 //    private void processKillFederateStarter(final String identity, final Object[] fields)
364 //            throws Sim0MQException, SerializationException
365 //    {
366 //        boolean status = true;
367 //        String error = "";
368 //
369 //        Object federationRunId = fields[1];
370 //        String senderId = fields[2].toString();
371 //
372 //        String modelId = fields[8].toString();
373 //        if (!this.modelPortMap.containsKey(modelId))
374 //        {
375 //            status = false;
376 //            error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
377 //        }
378 //        else
379 //        {
380 //            int modelPort = this.modelPortMap.remove(modelId);
381 //            Process process = this.runningProcessMap.remove(modelId);
382 //
383 //            try
384 //            {
385 //                try
386 //                {
387 //                    ZMQ.Socket modelSocket = this.lbContext.createSocket(SocketType.REQ);
388 //                    modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
389 //                    modelSocket.connect("tcp://127.0.0.1:" + modelPort);
390 //
391 //                    byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
392 //                            ++this.messageCount, MessageStatus.NEW);
393 //                    modelSocket.send(fs3Message, 0);
394 //
395 //                    modelSocket.close();
396 //                }
397 //                catch (Exception exception)
398 //                {
399 //                    exception.printStackTrace();
400 //                    status = true;
401 //                    error = exception.getMessage();
402 //                }
403 //
404 //                try
405 //                {
406 //                    Thread.sleep(100);
407 //                }
408 //                catch (InterruptedException ie)
409 //                {
410 //                    // ignore
411 //                }
412 //
413 //                if (process != null && process.isAlive())
414 //                {
415 //                    process.destroyForcibly();
416 //                }
417 //
418 //                StartFederateMessage sfm = this.startFederateMessages.get(modelId);
419 //                if (sfm.isDeleteStdout())
420 //                {
421 //                    if (sfm.getRedirectStdout().length() > 0)
422 //                    {
423 //                        File stdOutFile = new File(sfm.getRedirectStdout());
424 //                        stdOutFile.delete();
425 //                    }
426 //                }
427 //
428 //                if (sfm.isDeleteStderr())
429 //                {
430 //                    if (sfm.getRedirectStderr().length() > 0)
431 //                    {
432 //                        File stdErrFile = new File(sfm.getRedirectStderr());
433 //                        stdErrFile.delete();
434 //                    }
435 //                }
436 //
437 //                if (sfm.isDeleteWorkingDirectory())
438 //                {
439 //                    File workingDir = new File(sfm.getWorkingDirectory());
440 //                    workingDir.delete();
441 //                }
442 //            }
443 //            catch (Exception exception)
444 //            {
445 //                exception.printStackTrace();
446 //                status = false;
447 //                error = exception.getMessage();
448 //            }
449 //
450 //            byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
451 //                    MessageStatus.NEW, modelId, status, error);
452 //            this.lbSocket.sendMore(identity);
453 //            this.lbSocket.sendMore("");
454 //            this.lbSocket.send(fs4Message, 0);
455 //        }
456 //    }
457 //
458 //    /**
459 //     * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
460 //     * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
461 //     * @param args the federation name and port on which the FederateStarter is listening
462 //     * @throws Sim0MQException on error
463 //     * @throws SerializationException on error
464 //     */
465 //    public static void main(final String[] args) throws Sim0MQException, SerializationException
466 //    {
467 //        if (args.length < 4)
468 //        {
469 //            System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
470 //            System.exit(-1);
471 //        }
472 //
473 //        String sPort = args[0];
474 //        int port = 0;
475 //        try
476 //        {
477 //            port = Integer.parseInt(sPort);
478 //        }
479 //        catch (NumberFormatException nfe)
480 //        {
481 //            System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
482 //            System.exit(-1);
483 //        }
484 //        if (port == 0 || port > 65535)
485 //        {
486 //            System.err.println("PortNumber should be between 1 and 65535");
487 //            System.exit(-1);
488 //        }
489 //
490 //        String propertiesFile = args[1];
491 //        Properties softwareProperties = new Properties();
492 //        InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
493 //        try
494 //        {
495 //            softwareProperties.load(propertiesStream);
496 //        }
497 //        catch (IOException | NullPointerException e)
498 //        {
499 //            System.err.println("Could not find or read software properties file " + propertiesFile);
500 //            System.exit(-1);
501 //        }
502 //
503 //        String sStartPort = args[2];
504 //        int startPort = 0;
505 //        try
506 //        {
507 //            startPort = Integer.parseInt(sStartPort);
508 //        }
509 //        catch (NumberFormatException nfe)
510 //        {
511 //            System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
512 //            System.exit(-1);
513 //        }
514 //        if (startPort == 0 || startPort > 65535)
515 //        {
516 //            System.err.println("startPort should be between 1 and 65535");
517 //            System.exit(-1);
518 //        }
519 //
520 //        String sEndPort = args[3];
521 //        int endPort = 0;
522 //        try
523 //        {
524 //            endPort = Integer.parseInt(sEndPort);
525 //        }
526 //        catch (NumberFormatException nfe)
527 //        {
528 //            System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
529 //            System.exit(-1);
530 //        }
531 //        if (endPort == 0 || endPort > 65535)
532 //        {
533 //            System.err.println("endPort should be between 1 and 65535");
534 //            System.exit(-1);
535 //        }
536 //
537 //        new SimpleLoadBalancer(port, softwareProperties, startPort, endPort);
538 //    }
539 //
540 //    /** Record with information about the nodes that have a FederateStarter running. */
541 //    static class FederateStarterNode implements Comparable<FederateStarterNode>
542 //    {
543 //        /** the node name or IP address where the Federate Starter resides. */
544 //        private final String fsNodeName;
545 //
546 //        /** the port of the Federate Starter. */
547 //        private final int port;
548 //
549 //        /** the maximum load on the node (e.g., the maximum number of concurrent models). */
550 //        private final int maxLoad;
551 //
552 //        /** the current load on the node. */
553 //        private int currentLoad = 0;
554 //
555 //        /** the priority of the node. Higher value is higher priority. */
556 //        private int priority;
557 //
558 //        /**
559 //         * @param fsNodeName String; the node name or IP address where the Federate Starter resides
560 //         * @param port int; the port of the Federate Starter
561 //         * @param maxLoad int; the maximum load on the node (e.g., the maximum number of concurrent models)
562 //         * @param priority int; the priority of the node. Higher value is higher priority.
563 //         */
564 //        FederateStarterNode(final String fsNodeName, final int port, final int maxLoad, final int priority)
565 //        {
566 //            this.fsNodeName = fsNodeName;
567 //            this.port = port;
568 //            this.maxLoad = maxLoad;
569 //            this.priority = priority;
570 //        }
571 //
572 //        /**
573 //         * @return currentLoad
574 //         */
575 //        public final int getCurrentLoad()
576 //        {
577 //            return this.currentLoad;
578 //        }
579 //
580 //        /**
581 //         * @param currentLoad set currentLoad
582 //         */
583 //        public final void setCurrentLoad(final int currentLoad)
584 //        {
585 //            this.currentLoad = currentLoad;
586 //        }
587 //
588 //        /**
589 //         * @return priority
590 //         */
591 //        public final int getPriority()
592 //        {
593 //            return this.priority;
594 //        }
595 //
596 //        /**
597 //         * @param priority set priority
598 //         */
599 //        public final void setPriority(final int priority)
600 //        {
601 //            this.priority = priority;
602 //        }
603 //
604 //        /**
605 //         * @return fsNodeName
606 //         */
607 //        public final String getFsNodeName()
608 //        {
609 //            return this.fsNodeName;
610 //        }
611 //
612 //        /**
613 //         * @return port
614 //         */
615 //        public final int getPort()
616 //        {
617 //            return this.port;
618 //        }
619 //
620 //        /**
621 //         * @return maxLoad
622 //         */
623 //        public final int getMaxLoad()
624 //        {
625 //            return this.maxLoad;
626 //        }
627 //
628 //        /** {@inheritDoc} */
629 //        @Override
630 //        public int hashCode()
631 //        {
632 //            final int prime = 31;
633 //            int result = 1;
634 //            result = prime * result + ((this.fsNodeName == null) ? 0 : this.fsNodeName.hashCode());
635 //            result = prime * result + this.port;
636 //            return result;
637 //        }
638 //
639 //        /** {@inheritDoc} */
640 //        @SuppressWarnings("checkstyle:needbraces")
641 //        @Override
642 //        public boolean equals(final Object obj)
643 //        {
644 //            if (this == obj)
645 //                return true;
646 //            if (obj == null)
647 //                return false;
648 //            if (getClass() != obj.getClass())
649 //                return false;
650 //            FederateStarterNode other = (FederateStarterNode) obj;
651 //            if (this.fsNodeName == null)
652 //            {
653 //                if (other.fsNodeName != null)
654 //                    return false;
655 //            }
656 //            else if (!this.fsNodeName.equals(other.fsNodeName))
657 //                return false;
658 //            if (this.port != other.port)
659 //                return false;
660 //            return true;
661 //        }
662 //
663 //        /** {@inheritDoc} */
664 //        @SuppressWarnings("checkstyle:needbraces")
665 //        @Override
666 //        public int compareTo(final FederateStarterNode o)
667 //        {
668 //            // Compares this object with the specified object for order. Returns a
669 //            // negative integer, zero, or a positive integer as this object is less
670 //            // than, equal to, or greater than the specified object.
671 //            if (this.equals(o))
672 //                return 0;
673 //
674 //            // higher priority number means higher priority
675 //            if (this.priority > o.priority)
676 //                return 1;
677 //            if (this.priority < o.priority)
678 //                return -1;
679 //
680 //            // higher remaining load means higher priority
681 //            if (this.maxLoad - this.currentLoad > o.maxLoad - o.currentLoad)
682 //                return 1;
683 //            if (this.maxLoad - this.currentLoad < o.maxLoad - o.currentLoad)
684 //                return -1;
685 //
686 //            if (this.hashCode() > o.hashCode())
687 //                return 1;
688 //            if (this.hashCode() < o.hashCode())
689 //                return -1;
690 //            //
691 //            return 0;
692 //        }
693 //    }
694 }