1 package org.opentrafficsim.demo.web; 2 3 import org.zeromq.ZContext; 4 import org.zeromq.ZMQ; 5 6 /** 7 * The LoadBalancer start listening on the given port for messages. It is started with a map of network nodes where 8 * FederateStarters can start models, and a capacity of each node.<br> 9 * <br> 10 * The program is called as follows: java -jar LocalLoadBalancer.jar key1=value1 key2=value2 ... The following keys are defined: 11 * <ul> 12 * <li><b>port:</b> the port on which the LoadBalancer listens</li> 13 * <li><b>nodeFile:</b> the file that contains the node information</li> 14 * </ul> 15 * The nodeFile is expected to be tab-delimited with the following columns (and an example): 16 * 17 * <pre> 18 * nodeAddress fsPort maxLoad priority 19 * 10.0.0.121 5500 8 10 20 * 10.0.0.122 5500 8 10 21 * 10.0.0.123 5500 8 10 22 * 127.0.0.1 5500 8 5 23 * </pre> 24 * 25 * Where nodeAddress is the network name of the node or its IP address, fsPort is the port where the FederateStarter can be 26 * found on the node, maxLoad is an int indicating the maximum number of jobs the node can handle, and priority indicates which 27 * nodes should be scheduled first (nodes of lower priority are only scheduled when high priority nodes are full). The example 28 * above shows three nodes with priority 10, and the localhost with a lower priority. So only when 24 federates are running, 29 * localhost will be used. 30 * <p> 31 * Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br> 32 * BSD-style license. See <a href="https://opentrafficsim.org/docs/current/license.html">OTS License</a>. 33 * </p> 34 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a> 35 */ 36 public class SimpleLoadBalancer 37 { 38 // /** the port number to listen on. */ 39 // private final int lbPort; 40 41 // /** the nodes with federate starters that can be used. */ 42 // private final Set<FederateStarterNode> federateStarterNodes; 43 44 /** the 0mq socket. */ 45 private ZMQ.Socket lbSocket; 46 47 /** the 0mq context. */ 48 private ZContext lbContext; 49 50 /** message count. */ 51 private long messageCount = 0; 52 53 // /** 54 // * @param lbPort the port number to listen on 55 // * @param federateStarterNodes the nodes with federate starters that can be used 56 // * @throws Sim0MQException on error 57 // * @throws SerializationException on error 58 // */ 59 // public SimpleLoadBalancer(final int lbPort, final Set<FederateStarterNode> federateStarterNodes) 60 // throws Sim0MQException, SerializationException 61 // { 62 // super(); 63 // this.lbPort = lbPort; 64 // this.federateStarterNodes = federateStarterNodes; 65 // 66 // this.lbContext = new ZContext(1); 67 // 68 // this.lbSocket = this.lbContext.createSocket(SocketType.ROUTER); 69 // this.lbSocket.bind("tcp://*:" + this.lbPort); 70 // 71 // while (!Thread.currentThread().isInterrupted()) 72 // { 73 // // Wait for next request from the [web] client -- first the identity (String) and the delimiter (#0) 74 // String identity = this.lbSocket.recvStr(); 75 // this.lbSocket.recvStr(); 76 // 77 // byte[] request = this.lbSocket.recv(0); 78 // Object[] fields = SimulationMessage.decode(request); 79 // String messageTypeId = fields[4].toString(); 80 // String receiverId = fields[3].toString(); 81 // 82 // System.out.println("Received " + SimulationMessage.print(fields)); 83 // 84 // if (receiverId.equals("LB")) 85 // { 86 // switch (messageTypeId) 87 // { 88 // case "LB.1": 89 // processStartModel(identity, fields); 90 // break; 91 // 92 // case "LB.2": 93 // processModelKilled(identity, fields); 94 // break; 95 // 96 // default: 97 // // wrong message 98 // System.err.println("Received unknown message -- not processed: " + messageTypeId); 99 // } 100 // } 101 // else 102 // { 103 // // wrong receiver 104 // System.err.println("Received message not intended for LB but for " + receiverId + " -- not processed: "); 105 // } 106 // } 107 // this.lbSocket.close(); 108 // this.lbContext.destroy(); 109 // } 110 // 111 // /** 112 // * Process MC.X message and send LB.Y message back. 113 // * @param identity reply id for REQ-ROUTER pattern 114 // * @param fields the message 115 // * @throws Sim0MQException on error 116 // * @throws SerializationException on error 117 // */ 118 // private void processStartFederateStarter(final String identity, final Object[] fields) 119 // throws Sim0MQException, SerializationException 120 // { 121 // StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "LB"); 122 // String error = ""; 123 // 124 // int modelPort = findFreePortNumber(); 125 // 126 // if (modelPort == -1) 127 // { 128 // error = "No free port number"; 129 // } 130 // 131 // else 132 // 133 // { 134 // try 135 // { 136 // ProcessBuilder pb = new ProcessBuilder(); 137 // 138 // Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory())); 139 // pb.directory(workingPath.toFile()); 140 // 141 // String softwareCode = ""; 142 // if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode())) 143 // { 144 // System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode() 145 // + " in software properties file"); 146 // } 147 // else 148 // { 149 // softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode()); 150 // 151 // List<String> pbArgs = new ArrayList<>(); 152 // pbArgs.add(softwareCode); 153 // pbArgs.add(startFederateMessage.getArgsBefore()); 154 // pbArgs.add(startFederateMessage.getModelPath()); 155 // pbArgs.addAll(Arrays.asList( 156 // startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" "))); 157 // pb.command(pbArgs); 158 // 159 // String stdIn = startFederateMessage.getRedirectStdin(); 160 // String stdOut = startFederateMessage.getRedirectStdout(); 161 // String stdErr = startFederateMessage.getRedirectStderr(); 162 // 163 // if (stdIn.length() > 0) 164 // { 165 // // TODO working dir path if not absolute? 166 // File stdInFile = new File(stdIn); 167 // pb.redirectInput(stdInFile); 168 // } 169 // 170 // if (stdOut.length() > 0) 171 // { 172 // // TODO working dir path if not absolute? 173 // File stdOutFile = new File(stdOut); 174 // pb.redirectOutput(stdOutFile); 175 // } 176 // 177 // if (stdErr.length() > 0) 178 // { 179 // // TODO working dir path if not absolute? 180 // File stdErrFile = new File(stdErr); 181 // pb.redirectError(stdErrFile); 182 // } 183 // 184 // new Thread() 185 // { 186 // /** {@inheritDoc} */ 187 // @Override 188 // public void run() 189 // { 190 // try 191 // { 192 // Process process = pb.start(); 193 // SimpleLoadBalancer.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process); 194 // System.err.println("Process started:" + process.isAlive()); 195 // } 196 // catch (IOException exception) 197 // { 198 // exception.printStackTrace(); 199 // } 200 // } 201 // }.start(); 202 // 203 // this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort); 204 // this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage); 205 // 206 // // Thread.sleep(1000); 207 // 208 // // wait till the model is ready... 209 // error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(), 210 // modelPort); 211 // } 212 // } 213 // catch (IOException exception) 214 // { 215 // exception.printStackTrace(); 216 // error = exception.getMessage(); 217 // } 218 // } 219 // 220 // System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort); 221 // 222 // // Send reply back to client 223 // this.lbSocket.sendMore(identity); 224 // this.lbSocket.sendMore(""); 225 // //@formatter:off 226 // byte[] fs2Message = new FederateStartedMessage.Builder() 227 // .setSimulationRunId(startFederateMessage.getSimulationRunId()) 228 // .setInstanceId(startFederateMessage.getInstanceId()) 229 // .setSenderId("FS") 230 // .setReceiverId(startFederateMessage.getSenderId()) 231 // .setMessageId(++this.messageCount) 232 // .setStatus(error.isEmpty() ? "started" : "error") 233 // .setError(error) 234 // .setModelPort(modelPort) 235 // .build() 236 // .createByteArray(); 237 // this.lbSocket.send(fs2Message, 0); 238 // //@formatter:on 239 // } 240 // 241 // /** 242 // * Find a free port for the model. 243 // * @return the first free fort number in the range startPort - endPort, inclusive 244 // */ 245 // private int findFreePortNumber() 246 // { 247 // for (int port = this.startPort; port <= this.endPort; port++) 248 // { 249 // if (!this.modelPortMap.containsValue(port)) 250 // { 251 // // try if the port is really free 252 // ZMQ.Socket testSocket = null; 253 // try 254 // { 255 // testSocket = this.lbContext.createSocket(SocketType.REP); 256 // testSocket.bind("tcp://127.0.0.1:" + port); 257 // testSocket.unbind("tcp://127.0.0.1:" + port); 258 // testSocket.close(); 259 // return port; 260 // } 261 // catch (Exception exception) 262 // { 263 // // port was not free 264 // if (testSocket != null) 265 // { 266 // try 267 // { 268 // testSocket.close(); 269 // } 270 // catch (Exception e) 271 // { 272 // // ignore. 273 // } 274 // } 275 // } 276 // } 277 // } 278 // return -1; 279 // } 280 // 281 // /** 282 // * Wait for simulation to end using status polling with message FM.5. 283 // * @param federationRunId the name of the federation 284 // * @param modelId the String id of the model 285 // * @param modelPort port on which the model is listening 286 // * @return empty String for no error, filled String for error 287 // * @throws Sim0MQException on error 288 // * @throws SerializationException on error 289 // */ 290 // private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort) 291 // throws Sim0MQException, SerializationException 292 // { 293 // boolean ok = true; 294 // String error = ""; 295 // ZMQ.Socket modelSocket = null; 296 // try 297 // { 298 // modelSocket = this.lbContext.createSocket(SocketType.REQ); 299 // modelSocket.setIdentity(UUID.randomUUID().toString().getBytes()); 300 // modelSocket.connect("tcp://127.0.0.1:" + modelPort); 301 // } 302 // catch (Exception exception) 303 // { 304 // exception.printStackTrace(); 305 // ok = false; 306 // error = exception.getMessage(); 307 // } 308 // 309 // boolean started = false; 310 // while (ok && !started) 311 // { 312 // byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount, 313 // MessageStatus.NEW); 314 // modelSocket.send(fs1Message, 0); 315 // 316 // byte[] reply = modelSocket.recv(0); 317 // Object[] replyMessage = SimulationMessage.decode(reply); 318 // System.out.println("Received\n" + SimulationMessage.print(replyMessage)); 319 // 320 // if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error") 321 // && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount) 322 // { 323 // if (replyMessage[9].toString().equals("started")) 324 // { 325 // started = true; 326 // } 327 // else 328 // { 329 // // wait a second 330 // try 331 // { 332 // Thread.sleep(100); 333 // } 334 // catch (InterruptedException ie) 335 // { 336 // // ignore 337 // } 338 // } 339 // } 340 // else 341 // { 342 // ok = false; 343 // error = replyMessage[10].toString(); 344 // System.err.println("Simulation start error -- status = " + replyMessage[9]); 345 // System.err.println("Error message = " + replyMessage[10]); 346 // } 347 // } 348 // 349 // if (modelSocket != null) 350 // { 351 // modelSocket.close(); 352 // } 353 // 354 // return error; 355 // } 356 // 357 // /** 358 // * Process FM.8 message and send FS.4 message back. 359 // * @param identity reply id for REQ-ROUTER pattern 360 // * @param fields the message 361 // * @throws Sim0MQException on error 362 // * @throws SerializationException on error 363 // */ 364 // private void processKillFederateStarter(final String identity, final Object[] fields) 365 // throws Sim0MQException, SerializationException 366 // { 367 // boolean status = true; 368 // String error = ""; 369 // 370 // Object federationRunId = fields[1]; 371 // String senderId = fields[2].toString(); 372 // 373 // String modelId = fields[8].toString(); 374 // if (!this.modelPortMap.containsKey(modelId)) 375 // { 376 // status = false; 377 // error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter"; 378 // } 379 // else 380 // { 381 // int modelPort = this.modelPortMap.remove(modelId); 382 // Process process = this.runningProcessMap.remove(modelId); 383 // 384 // try 385 // { 386 // try 387 // { 388 // ZMQ.Socket modelSocket = this.lbContext.createSocket(SocketType.REQ); 389 // modelSocket.setIdentity(UUID.randomUUID().toString().getBytes()); 390 // modelSocket.connect("tcp://127.0.0.1:" + modelPort); 391 // 392 // byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3", 393 // ++this.messageCount, MessageStatus.NEW); 394 // modelSocket.send(fs3Message, 0); 395 // 396 // modelSocket.close(); 397 // } 398 // catch (Exception exception) 399 // { 400 // exception.printStackTrace(); 401 // status = true; 402 // error = exception.getMessage(); 403 // } 404 // 405 // try 406 // { 407 // Thread.sleep(100); 408 // } 409 // catch (InterruptedException ie) 410 // { 411 // // ignore 412 // } 413 // 414 // if (process != null && process.isAlive()) 415 // { 416 // process.destroyForcibly(); 417 // } 418 // 419 // StartFederateMessage sfm = this.startFederateMessages.get(modelId); 420 // if (sfm.isDeleteStdout()) 421 // { 422 // if (sfm.getRedirectStdout().length() > 0) 423 // { 424 // File stdOutFile = new File(sfm.getRedirectStdout()); 425 // stdOutFile.delete(); 426 // } 427 // } 428 // 429 // if (sfm.isDeleteStderr()) 430 // { 431 // if (sfm.getRedirectStderr().length() > 0) 432 // { 433 // File stdErrFile = new File(sfm.getRedirectStderr()); 434 // stdErrFile.delete(); 435 // } 436 // } 437 // 438 // if (sfm.isDeleteWorkingDirectory()) 439 // { 440 // File workingDir = new File(sfm.getWorkingDirectory()); 441 // workingDir.delete(); 442 // } 443 // } 444 // catch (Exception exception) 445 // { 446 // exception.printStackTrace(); 447 // status = false; 448 // error = exception.getMessage(); 449 // } 450 // 451 // byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount, 452 // MessageStatus.NEW, modelId, status, error); 453 // this.lbSocket.sendMore(identity); 454 // this.lbSocket.sendMore(""); 455 // this.lbSocket.send(fs4Message, 0); 456 // } 457 // } 458 // 459 // /** 460 // * Start listening on the given port for messages to start components. Report back via the call-back port on the status of 461 // * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process. 462 // * @param args the federation name and port on which the FederateStarter is listening 463 // * @throws Sim0MQException on error 464 // * @throws SerializationException on error 465 // */ 466 // public static void main(final String[] args) throws Sim0MQException, SerializationException 467 // { 468 // if (args.length < 4) 469 // { 470 // System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort"); 471 // System.exit(-1); 472 // } 473 // 474 // String sPort = args[0]; 475 // int port = 0; 476 // try 477 // { 478 // port = Integer.parseInt(sPort); 479 // } 480 // catch (NumberFormatException nfe) 481 // { 482 // System.err.println("Use as FederateStarter portNumber, where portNumber is a number"); 483 // System.exit(-1); 484 // } 485 // if (port == 0 || port > 65535) 486 // { 487 // System.err.println("PortNumber should be between 1 and 65535"); 488 // System.exit(-1); 489 // } 490 // 491 // String propertiesFile = args[1]; 492 // Properties softwareProperties = new Properties(); 493 // InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile); 494 // try 495 // { 496 // softwareProperties.load(propertiesStream); 497 // } 498 // catch (IOException | NullPointerException e) 499 // { 500 // System.err.println("Could not find or read software properties file " + propertiesFile); 501 // System.exit(-1); 502 // } 503 // 504 // String sStartPort = args[2]; 505 // int startPort = 0; 506 // try 507 // { 508 // startPort = Integer.parseInt(sStartPort); 509 // } 510 // catch (NumberFormatException nfe) 511 // { 512 // System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number"); 513 // System.exit(-1); 514 // } 515 // if (startPort == 0 || startPort > 65535) 516 // { 517 // System.err.println("startPort should be between 1 and 65535"); 518 // System.exit(-1); 519 // } 520 // 521 // String sEndPort = args[3]; 522 // int endPort = 0; 523 // try 524 // { 525 // endPort = Integer.parseInt(sEndPort); 526 // } 527 // catch (NumberFormatException nfe) 528 // { 529 // System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number"); 530 // System.exit(-1); 531 // } 532 // if (endPort == 0 || endPort > 65535) 533 // { 534 // System.err.println("endPort should be between 1 and 65535"); 535 // System.exit(-1); 536 // } 537 // 538 // new SimpleLoadBalancer(port, softwareProperties, startPort, endPort); 539 // } 540 // 541 // /** Record with information about the nodes that have a FederateStarter running. */ 542 // static class FederateStarterNode implements Comparable<FederateStarterNode> 543 // { 544 // /** the node name or IP address where the Federate Starter resides. */ 545 // private final String fsNodeName; 546 // 547 // /** the port of the Federate Starter. */ 548 // private final int port; 549 // 550 // /** the maximum load on the node (e.g., the maximum number of concurrent models). */ 551 // private final int maxLoad; 552 // 553 // /** the current load on the node. */ 554 // private int currentLoad = 0; 555 // 556 // /** the priority of the node. Higher value is higher priority. */ 557 // private int priority; 558 // 559 // /** 560 // * @param fsNodeName String; the node name or IP address where the Federate Starter resides 561 // * @param port int; the port of the Federate Starter 562 // * @param maxLoad int; the maximum load on the node (e.g., the maximum number of concurrent models) 563 // * @param priority int; the priority of the node. Higher value is higher priority. 564 // */ 565 // FederateStarterNode(final String fsNodeName, final int port, final int maxLoad, final int priority) 566 // { 567 // super(); 568 // this.fsNodeName = fsNodeName; 569 // this.port = port; 570 // this.maxLoad = maxLoad; 571 // this.priority = priority; 572 // } 573 // 574 // /** 575 // * @return currentLoad 576 // */ 577 // public final int getCurrentLoad() 578 // { 579 // return this.currentLoad; 580 // } 581 // 582 // /** 583 // * @param currentLoad set currentLoad 584 // */ 585 // public final void setCurrentLoad(final int currentLoad) 586 // { 587 // this.currentLoad = currentLoad; 588 // } 589 // 590 // /** 591 // * @return priority 592 // */ 593 // public final int getPriority() 594 // { 595 // return this.priority; 596 // } 597 // 598 // /** 599 // * @param priority set priority 600 // */ 601 // public final void setPriority(final int priority) 602 // { 603 // this.priority = priority; 604 // } 605 // 606 // /** 607 // * @return fsNodeName 608 // */ 609 // public final String getFsNodeName() 610 // { 611 // return this.fsNodeName; 612 // } 613 // 614 // /** 615 // * @return port 616 // */ 617 // public final int getPort() 618 // { 619 // return this.port; 620 // } 621 // 622 // /** 623 // * @return maxLoad 624 // */ 625 // public final int getMaxLoad() 626 // { 627 // return this.maxLoad; 628 // } 629 // 630 // /** {@inheritDoc} */ 631 // @Override 632 // public int hashCode() 633 // { 634 // final int prime = 31; 635 // int result = 1; 636 // result = prime * result + ((this.fsNodeName == null) ? 0 : this.fsNodeName.hashCode()); 637 // result = prime * result + this.port; 638 // return result; 639 // } 640 // 641 // /** {@inheritDoc} */ 642 // @SuppressWarnings("checkstyle:needbraces") 643 // @Override 644 // public boolean equals(final Object obj) 645 // { 646 // if (this == obj) 647 // return true; 648 // if (obj == null) 649 // return false; 650 // if (getClass() != obj.getClass()) 651 // return false; 652 // FederateStarterNode other = (FederateStarterNode) obj; 653 // if (this.fsNodeName == null) 654 // { 655 // if (other.fsNodeName != null) 656 // return false; 657 // } 658 // else if (!this.fsNodeName.equals(other.fsNodeName)) 659 // return false; 660 // if (this.port != other.port) 661 // return false; 662 // return true; 663 // } 664 // 665 // /** {@inheritDoc} */ 666 // @SuppressWarnings("checkstyle:needbraces") 667 // @Override 668 // public int compareTo(final FederateStarterNode o) 669 // { 670 // // Compares this object with the specified object for order. Returns a 671 // // negative integer, zero, or a positive integer as this object is less 672 // // than, equal to, or greater than the specified object. 673 // if (this.equals(o)) 674 // return 0; 675 // 676 // // higher priority number means higher priority 677 // if (this.priority > o.priority) 678 // return 1; 679 // if (this.priority < o.priority) 680 // return -1; 681 // 682 // // higher remaining load means higher priority 683 // if (this.maxLoad - this.currentLoad > o.maxLoad - o.currentLoad) 684 // return 1; 685 // if (this.maxLoad - this.currentLoad < o.maxLoad - o.currentLoad) 686 // return -1; 687 // 688 // if (this.hashCode() > o.hashCode()) 689 // return 1; 690 // if (this.hashCode() < o.hashCode()) 691 // return -1; 692 // // 693 // return 0; 694 // } 695 // } 696 }