1 package org.opentrafficsim.demo.web; 2 3 import org.zeromq.ZContext; 4 import org.zeromq.ZMQ; 5 6 /** 7 * The LoadBalancer start listening on the given port for messages. It is started with a map of network nodes where 8 * FederateStarters can start models, and a capacity of each node.<br> 9 * <br> 10 * The program is called as follows: java -jar LocalLoadBalancer.jar key1=value1 key2=value2 ... The following keys are defined: 11 * <ul> 12 * <li><b>port:</b> the port on which the LoadBalancer listens</li> 13 * <li><b>nodeFile:</b> the file that contains the node information</li> 14 * </ul> 15 * The nodeFile is expected to be tab-delimited with the following columns (and an example): 16 * 17 * <pre> 18 * nodeAddress fsPort maxLoad priority 19 * 10.0.0.121 5500 8 10 20 * 10.0.0.122 5500 8 10 21 * 10.0.0.123 5500 8 10 22 * 127.0.0.1 5500 8 5 23 * </pre> 24 * 25 * Where nodeAddress is the network name of the node or its IP address, fsPort is the port where the FederateStarter can be 26 * found on the node, maxLoad is an int indicating the maximum number of jobs the node can handle, and priority indicates which 27 * nodes should be scheduled first (nodes of lower priority are only scheduled when high priority nodes are full). The example 28 * above shows three nodes with priority 10, and the localhost with a lower priority. So only when 24 federates are running, 29 * localhost will be used. 30 * <p> 31 * Copyright (c) 2016-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br> 32 * BSD-style license. See <a href="https://opentrafficsim.org/docs/current/license.html">OTS License</a>. 33 * </p> 34 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a> 35 */ 36 public class SimpleLoadBalancer 37 { 38 // /** the port number to listen on. */ 39 // private final int lbPort; 40 41 // /** the nodes with federate starters that can be used. */ 42 // private final Set<FederateStarterNode> federateStarterNodes; 43 44 /** the 0mq socket. */ 45 private ZMQ.Socket lbSocket; 46 47 /** the 0mq context. */ 48 private ZContext lbContext; 49 50 /** message count. */ 51 private long messageCount = 0; 52 53 // /** 54 // * @param lbPort the port number to listen on 55 // * @param federateStarterNodes the nodes with federate starters that can be used 56 // * @throws Sim0MQException on error 57 // * @throws SerializationException on error 58 // */ 59 // public SimpleLoadBalancer(final int lbPort, final Set<FederateStarterNode> federateStarterNodes) 60 // throws Sim0MQException, SerializationException 61 // { 62 // this.lbPort = lbPort; 63 // this.federateStarterNodes = federateStarterNodes; 64 // 65 // this.lbContext = new ZContext(1); 66 // 67 // this.lbSocket = this.lbContext.createSocket(SocketType.ROUTER); 68 // this.lbSocket.bind("tcp://*:" + this.lbPort); 69 // 70 // while (!Thread.currentThread().isInterrupted()) 71 // { 72 // // Wait for next request from the [web] client -- first the identity (String) and the delimiter (#0) 73 // String identity = this.lbSocket.recvStr(); 74 // this.lbSocket.recvStr(); 75 // 76 // byte[] request = this.lbSocket.recv(0); 77 // Object[] fields = SimulationMessage.decode(request); 78 // String messageTypeId = fields[4].toString(); 79 // String receiverId = fields[3].toString(); 80 // 81 // System.out.println("Received " + SimulationMessage.print(fields)); 82 // 83 // if (receiverId.equals("LB")) 84 // { 85 // switch (messageTypeId) 86 // { 87 // case "LB.1": 88 // processStartModel(identity, fields); 89 // break; 90 // 91 // case "LB.2": 92 // processModelKilled(identity, fields); 93 // break; 94 // 95 // default: 96 // // wrong message 97 // System.err.println("Received unknown message -- not processed: " + messageTypeId); 98 // } 99 // } 100 // else 101 // { 102 // // wrong receiver 103 // System.err.println("Received message not intended for LB but for " + receiverId + " -- not processed: "); 104 // } 105 // } 106 // this.lbSocket.close(); 107 // this.lbContext.destroy(); 108 // } 109 // 110 // /** 111 // * Process MC.X message and send LB.Y message back. 112 // * @param identity reply id for REQ-ROUTER pattern 113 // * @param fields the message 114 // * @throws Sim0MQException on error 115 // * @throws SerializationException on error 116 // */ 117 // private void processStartFederateStarter(final String identity, final Object[] fields) 118 // throws Sim0MQException, SerializationException 119 // { 120 // StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "LB"); 121 // String error = ""; 122 // 123 // int modelPort = findFreePortNumber(); 124 // 125 // if (modelPort == -1) 126 // { 127 // error = "No free port number"; 128 // } 129 // 130 // else 131 // 132 // { 133 // try 134 // { 135 // ProcessBuilder pb = new ProcessBuilder(); 136 // 137 // Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory())); 138 // pb.directory(workingPath.toFile()); 139 // 140 // String softwareCode = ""; 141 // if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode())) 142 // { 143 // System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode() 144 // + " in software properties file"); 145 // } 146 // else 147 // { 148 // softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode()); 149 // 150 // List<String> pbArgs = new ArrayList<>(); 151 // pbArgs.add(softwareCode); 152 // pbArgs.add(startFederateMessage.getArgsBefore()); 153 // pbArgs.add(startFederateMessage.getModelPath()); 154 // pbArgs.addAll(Arrays.asList( 155 // startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" "))); 156 // pb.command(pbArgs); 157 // 158 // String stdIn = startFederateMessage.getRedirectStdin(); 159 // String stdOut = startFederateMessage.getRedirectStdout(); 160 // String stdErr = startFederateMessage.getRedirectStderr(); 161 // 162 // if (stdIn.length() > 0) 163 // { 164 // // TODO working dir path if not absolute? 165 // File stdInFile = new File(stdIn); 166 // pb.redirectInput(stdInFile); 167 // } 168 // 169 // if (stdOut.length() > 0) 170 // { 171 // // TODO working dir path if not absolute? 172 // File stdOutFile = new File(stdOut); 173 // pb.redirectOutput(stdOutFile); 174 // } 175 // 176 // if (stdErr.length() > 0) 177 // { 178 // // TODO working dir path if not absolute? 179 // File stdErrFile = new File(stdErr); 180 // pb.redirectError(stdErrFile); 181 // } 182 // 183 // new Thread() 184 // { 185 // /** {@inheritDoc} */ 186 // @Override 187 // public void run() 188 // { 189 // try 190 // { 191 // Process process = pb.start(); 192 // SimpleLoadBalancer.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process); 193 // System.err.println("Process started:" + process.isAlive()); 194 // } 195 // catch (IOException exception) 196 // { 197 // exception.printStackTrace(); 198 // } 199 // } 200 // }.start(); 201 // 202 // this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort); 203 // this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage); 204 // 205 // // Thread.sleep(1000); 206 // 207 // // wait till the model is ready... 208 // error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(), 209 // modelPort); 210 // } 211 // } 212 // catch (IOException exception) 213 // { 214 // exception.printStackTrace(); 215 // error = exception.getMessage(); 216 // } 217 // } 218 // 219 // System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort); 220 // 221 // // Send reply back to client 222 // this.lbSocket.sendMore(identity); 223 // this.lbSocket.sendMore(""); 224 // //@formatter:off 225 // byte[] fs2Message = new FederateStartedMessage.Builder() 226 // .setSimulationRunId(startFederateMessage.getSimulationRunId()) 227 // .setInstanceId(startFederateMessage.getInstanceId()) 228 // .setSenderId("FS") 229 // .setReceiverId(startFederateMessage.getSenderId()) 230 // .setMessageId(++this.messageCount) 231 // .setStatus(error.isEmpty() ? "started" : "error") 232 // .setError(error) 233 // .setModelPort(modelPort) 234 // .build() 235 // .createByteArray(); 236 // this.lbSocket.send(fs2Message, 0); 237 // //@formatter:on 238 // } 239 // 240 // /** 241 // * Find a free port for the model. 242 // * @return the first free fort number in the range startPort - endPort, inclusive 243 // */ 244 // private int findFreePortNumber() 245 // { 246 // for (int port = this.startPort; port <= this.endPort; port++) 247 // { 248 // if (!this.modelPortMap.containsValue(port)) 249 // { 250 // // try if the port is really free 251 // ZMQ.Socket testSocket = null; 252 // try 253 // { 254 // testSocket = this.lbContext.createSocket(SocketType.REP); 255 // testSocket.bind("tcp://127.0.0.1:" + port); 256 // testSocket.unbind("tcp://127.0.0.1:" + port); 257 // testSocket.close(); 258 // return port; 259 // } 260 // catch (Exception exception) 261 // { 262 // // port was not free 263 // if (testSocket != null) 264 // { 265 // try 266 // { 267 // testSocket.close(); 268 // } 269 // catch (Exception e) 270 // { 271 // // ignore. 272 // } 273 // } 274 // } 275 // } 276 // } 277 // return -1; 278 // } 279 // 280 // /** 281 // * Wait for simulation to end using status polling with message FM.5. 282 // * @param federationRunId the name of the federation 283 // * @param modelId the String id of the model 284 // * @param modelPort port on which the model is listening 285 // * @return empty String for no error, filled String for error 286 // * @throws Sim0MQException on error 287 // * @throws SerializationException on error 288 // */ 289 // private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort) 290 // throws Sim0MQException, SerializationException 291 // { 292 // boolean ok = true; 293 // String error = ""; 294 // ZMQ.Socket modelSocket = null; 295 // try 296 // { 297 // modelSocket = this.lbContext.createSocket(SocketType.REQ); 298 // modelSocket.setIdentity(UUID.randomUUID().toString().getBytes()); 299 // modelSocket.connect("tcp://127.0.0.1:" + modelPort); 300 // } 301 // catch (Exception exception) 302 // { 303 // exception.printStackTrace(); 304 // ok = false; 305 // error = exception.getMessage(); 306 // } 307 // 308 // boolean started = false; 309 // while (ok && !started) 310 // { 311 // byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount, 312 // MessageStatus.NEW); 313 // modelSocket.send(fs1Message, 0); 314 // 315 // byte[] reply = modelSocket.recv(0); 316 // Object[] replyMessage = SimulationMessage.decode(reply); 317 // System.out.println("Received\n" + SimulationMessage.print(replyMessage)); 318 // 319 // if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error") 320 // && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount) 321 // { 322 // if (replyMessage[9].toString().equals("started")) 323 // { 324 // started = true; 325 // } 326 // else 327 // { 328 // // wait a second 329 // try 330 // { 331 // Thread.sleep(100); 332 // } 333 // catch (InterruptedException ie) 334 // { 335 // // ignore 336 // } 337 // } 338 // } 339 // else 340 // { 341 // ok = false; 342 // error = replyMessage[10].toString(); 343 // System.err.println("Simulation start error -- status = " + replyMessage[9]); 344 // System.err.println("Error message = " + replyMessage[10]); 345 // } 346 // } 347 // 348 // if (modelSocket != null) 349 // { 350 // modelSocket.close(); 351 // } 352 // 353 // return error; 354 // } 355 // 356 // /** 357 // * Process FM.8 message and send FS.4 message back. 358 // * @param identity reply id for REQ-ROUTER pattern 359 // * @param fields the message 360 // * @throws Sim0MQException on error 361 // * @throws SerializationException on error 362 // */ 363 // private void processKillFederateStarter(final String identity, final Object[] fields) 364 // throws Sim0MQException, SerializationException 365 // { 366 // boolean status = true; 367 // String error = ""; 368 // 369 // Object federationRunId = fields[1]; 370 // String senderId = fields[2].toString(); 371 // 372 // String modelId = fields[8].toString(); 373 // if (!this.modelPortMap.containsKey(modelId)) 374 // { 375 // status = false; 376 // error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter"; 377 // } 378 // else 379 // { 380 // int modelPort = this.modelPortMap.remove(modelId); 381 // Process process = this.runningProcessMap.remove(modelId); 382 // 383 // try 384 // { 385 // try 386 // { 387 // ZMQ.Socket modelSocket = this.lbContext.createSocket(SocketType.REQ); 388 // modelSocket.setIdentity(UUID.randomUUID().toString().getBytes()); 389 // modelSocket.connect("tcp://127.0.0.1:" + modelPort); 390 // 391 // byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3", 392 // ++this.messageCount, MessageStatus.NEW); 393 // modelSocket.send(fs3Message, 0); 394 // 395 // modelSocket.close(); 396 // } 397 // catch (Exception exception) 398 // { 399 // exception.printStackTrace(); 400 // status = true; 401 // error = exception.getMessage(); 402 // } 403 // 404 // try 405 // { 406 // Thread.sleep(100); 407 // } 408 // catch (InterruptedException ie) 409 // { 410 // // ignore 411 // } 412 // 413 // if (process != null && process.isAlive()) 414 // { 415 // process.destroyForcibly(); 416 // } 417 // 418 // StartFederateMessage sfm = this.startFederateMessages.get(modelId); 419 // if (sfm.isDeleteStdout()) 420 // { 421 // if (sfm.getRedirectStdout().length() > 0) 422 // { 423 // File stdOutFile = new File(sfm.getRedirectStdout()); 424 // stdOutFile.delete(); 425 // } 426 // } 427 // 428 // if (sfm.isDeleteStderr()) 429 // { 430 // if (sfm.getRedirectStderr().length() > 0) 431 // { 432 // File stdErrFile = new File(sfm.getRedirectStderr()); 433 // stdErrFile.delete(); 434 // } 435 // } 436 // 437 // if (sfm.isDeleteWorkingDirectory()) 438 // { 439 // File workingDir = new File(sfm.getWorkingDirectory()); 440 // workingDir.delete(); 441 // } 442 // } 443 // catch (Exception exception) 444 // { 445 // exception.printStackTrace(); 446 // status = false; 447 // error = exception.getMessage(); 448 // } 449 // 450 // byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount, 451 // MessageStatus.NEW, modelId, status, error); 452 // this.lbSocket.sendMore(identity); 453 // this.lbSocket.sendMore(""); 454 // this.lbSocket.send(fs4Message, 0); 455 // } 456 // } 457 // 458 // /** 459 // * Start listening on the given port for messages to start components. Report back via the call-back port on the status of 460 // * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process. 461 // * @param args the federation name and port on which the FederateStarter is listening 462 // * @throws Sim0MQException on error 463 // * @throws SerializationException on error 464 // */ 465 // public static void main(final String[] args) throws Sim0MQException, SerializationException 466 // { 467 // if (args.length < 4) 468 // { 469 // System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort"); 470 // System.exit(-1); 471 // } 472 // 473 // String sPort = args[0]; 474 // int port = 0; 475 // try 476 // { 477 // port = Integer.parseInt(sPort); 478 // } 479 // catch (NumberFormatException nfe) 480 // { 481 // System.err.println("Use as FederateStarter portNumber, where portNumber is a number"); 482 // System.exit(-1); 483 // } 484 // if (port == 0 || port > 65535) 485 // { 486 // System.err.println("PortNumber should be between 1 and 65535"); 487 // System.exit(-1); 488 // } 489 // 490 // String propertiesFile = args[1]; 491 // Properties softwareProperties = new Properties(); 492 // InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile); 493 // try 494 // { 495 // softwareProperties.load(propertiesStream); 496 // } 497 // catch (IOException | NullPointerException e) 498 // { 499 // System.err.println("Could not find or read software properties file " + propertiesFile); 500 // System.exit(-1); 501 // } 502 // 503 // String sStartPort = args[2]; 504 // int startPort = 0; 505 // try 506 // { 507 // startPort = Integer.parseInt(sStartPort); 508 // } 509 // catch (NumberFormatException nfe) 510 // { 511 // System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number"); 512 // System.exit(-1); 513 // } 514 // if (startPort == 0 || startPort > 65535) 515 // { 516 // System.err.println("startPort should be between 1 and 65535"); 517 // System.exit(-1); 518 // } 519 // 520 // String sEndPort = args[3]; 521 // int endPort = 0; 522 // try 523 // { 524 // endPort = Integer.parseInt(sEndPort); 525 // } 526 // catch (NumberFormatException nfe) 527 // { 528 // System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number"); 529 // System.exit(-1); 530 // } 531 // if (endPort == 0 || endPort > 65535) 532 // { 533 // System.err.println("endPort should be between 1 and 65535"); 534 // System.exit(-1); 535 // } 536 // 537 // new SimpleLoadBalancer(port, softwareProperties, startPort, endPort); 538 // } 539 // 540 // /** Record with information about the nodes that have a FederateStarter running. */ 541 // static class FederateStarterNode implements Comparable<FederateStarterNode> 542 // { 543 // /** the node name or IP address where the Federate Starter resides. */ 544 // private final String fsNodeName; 545 // 546 // /** the port of the Federate Starter. */ 547 // private final int port; 548 // 549 // /** the maximum load on the node (e.g., the maximum number of concurrent models). */ 550 // private final int maxLoad; 551 // 552 // /** the current load on the node. */ 553 // private int currentLoad = 0; 554 // 555 // /** the priority of the node. Higher value is higher priority. */ 556 // private int priority; 557 // 558 // /** 559 // * @param fsNodeName String; the node name or IP address where the Federate Starter resides 560 // * @param port int; the port of the Federate Starter 561 // * @param maxLoad int; the maximum load on the node (e.g., the maximum number of concurrent models) 562 // * @param priority int; the priority of the node. Higher value is higher priority. 563 // */ 564 // FederateStarterNode(final String fsNodeName, final int port, final int maxLoad, final int priority) 565 // { 566 // this.fsNodeName = fsNodeName; 567 // this.port = port; 568 // this.maxLoad = maxLoad; 569 // this.priority = priority; 570 // } 571 // 572 // /** 573 // * @return currentLoad 574 // */ 575 // public final int getCurrentLoad() 576 // { 577 // return this.currentLoad; 578 // } 579 // 580 // /** 581 // * @param currentLoad set currentLoad 582 // */ 583 // public final void setCurrentLoad(final int currentLoad) 584 // { 585 // this.currentLoad = currentLoad; 586 // } 587 // 588 // /** 589 // * @return priority 590 // */ 591 // public final int getPriority() 592 // { 593 // return this.priority; 594 // } 595 // 596 // /** 597 // * @param priority set priority 598 // */ 599 // public final void setPriority(final int priority) 600 // { 601 // this.priority = priority; 602 // } 603 // 604 // /** 605 // * @return fsNodeName 606 // */ 607 // public final String getFsNodeName() 608 // { 609 // return this.fsNodeName; 610 // } 611 // 612 // /** 613 // * @return port 614 // */ 615 // public final int getPort() 616 // { 617 // return this.port; 618 // } 619 // 620 // /** 621 // * @return maxLoad 622 // */ 623 // public final int getMaxLoad() 624 // { 625 // return this.maxLoad; 626 // } 627 // 628 // /** {@inheritDoc} */ 629 // @Override 630 // public int hashCode() 631 // { 632 // final int prime = 31; 633 // int result = 1; 634 // result = prime * result + ((this.fsNodeName == null) ? 0 : this.fsNodeName.hashCode()); 635 // result = prime * result + this.port; 636 // return result; 637 // } 638 // 639 // /** {@inheritDoc} */ 640 // @SuppressWarnings("checkstyle:needbraces") 641 // @Override 642 // public boolean equals(final Object obj) 643 // { 644 // if (this == obj) 645 // return true; 646 // if (obj == null) 647 // return false; 648 // if (getClass() != obj.getClass()) 649 // return false; 650 // FederateStarterNode other = (FederateStarterNode) obj; 651 // if (this.fsNodeName == null) 652 // { 653 // if (other.fsNodeName != null) 654 // return false; 655 // } 656 // else if (!this.fsNodeName.equals(other.fsNodeName)) 657 // return false; 658 // if (this.port != other.port) 659 // return false; 660 // return true; 661 // } 662 // 663 // /** {@inheritDoc} */ 664 // @SuppressWarnings("checkstyle:needbraces") 665 // @Override 666 // public int compareTo(final FederateStarterNode o) 667 // { 668 // // Compares this object with the specified object for order. Returns a 669 // // negative integer, zero, or a positive integer as this object is less 670 // // than, equal to, or greater than the specified object. 671 // if (this.equals(o)) 672 // return 0; 673 // 674 // // higher priority number means higher priority 675 // if (this.priority > o.priority) 676 // return 1; 677 // if (this.priority < o.priority) 678 // return -1; 679 // 680 // // higher remaining load means higher priority 681 // if (this.maxLoad - this.currentLoad > o.maxLoad - o.currentLoad) 682 // return 1; 683 // if (this.maxLoad - this.currentLoad < o.maxLoad - o.currentLoad) 684 // return -1; 685 // 686 // if (this.hashCode() > o.hashCode()) 687 // return 1; 688 // if (this.hashCode() < o.hashCode()) 689 // return -1; 690 // // 691 // return 0; 692 // } 693 // } 694 }