1 package org.opentrafficsim.demo.web;
2
3 import org.zeromq.ZContext;
4 import org.zeromq.ZMQ;
5
6 /**
7 * The LoadBalancer start listening on the given port for messages. It is started with a map of network nodes where
8 * FederateStarters can start models, and a capacity of each node.<br>
9 * <br>
10 * The program is called as follows: java -jar LocalLoadBalancer.jar key1=value1 key2=value2 ... The following keys are defined:
11 * <ul>
12 * <li><b>port:</b> the port on which the LoadBalancer listens</li>
13 * <li><b>nodeFile:</b> the file that contains the node information</li>
14 * </ul>
15 * The nodeFile is expected to be tab-delimited with the following columns (and an example):
16 *
17 * <pre>
18 * nodeAddress fsPort maxLoad priority
19 * 10.0.0.121 5500 8 10
20 * 10.0.0.122 5500 8 10
21 * 10.0.0.123 5500 8 10
22 * 127.0.0.1 5500 8 5
23 * </pre>
24 *
25 * Where nodeAddress is the network name of the node or its IP address, fsPort is the port where the FederateStarter can be
26 * found on the node, maxLoad is an int indicating the maximum number of jobs the node can handle, and priority indicates which
27 * nodes should be scheduled first (nodes of lower priority are only scheduled when high priority nodes are full). The example
28 * above shows three nodes with priority 10, and the localhost with a lower priority. So only when 24 federates are running,
29 * localhost will be used.
30 * <p>
31 * Copyright (c) 2016-2022 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
32 * BSD-style license. See <a href="https://opentrafficsim.org/docs/current/license.html">OTS License</a>.
33 * </p>
34 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
35 */
36 public class SimpleLoadBalancer
37 {
38 // /** the port number to listen on. */
39 // private final int lbPort;
40
41 // /** the nodes with federate starters that can be used. */
42 // private final Set<FederateStarterNode> federateStarterNodes;
43
44 /** the 0mq socket. */
45 private ZMQ.Socket lbSocket;
46
47 /** the 0mq context. */
48 private ZContext lbContext;
49
50 /** message count. */
51 private long messageCount = 0;
52
53 // /**
54 // * @param lbPort the port number to listen on
55 // * @param federateStarterNodes the nodes with federate starters that can be used
56 // * @throws Sim0MQException on error
57 // * @throws SerializationException on error
58 // */
59 // public SimpleLoadBalancer(final int lbPort, final Set<FederateStarterNode> federateStarterNodes)
60 // throws Sim0MQException, SerializationException
61 // {
62 // this.lbPort = lbPort;
63 // this.federateStarterNodes = federateStarterNodes;
64 //
65 // this.lbContext = new ZContext(1);
66 //
67 // this.lbSocket = this.lbContext.createSocket(SocketType.ROUTER);
68 // this.lbSocket.bind("tcp://*:" + this.lbPort);
69 //
70 // while (!Thread.currentThread().isInterrupted())
71 // {
72 // // Wait for next request from the [web] client -- first the identity (String) and the delimiter (#0)
73 // String identity = this.lbSocket.recvStr();
74 // this.lbSocket.recvStr();
75 //
76 // byte[] request = this.lbSocket.recv(0);
77 // Object[] fields = SimulationMessage.decode(request);
78 // String messageTypeId = fields[4].toString();
79 // String receiverId = fields[3].toString();
80 //
81 // System.out.println("Received " + SimulationMessage.print(fields));
82 //
83 // if (receiverId.equals("LB"))
84 // {
85 // switch (messageTypeId)
86 // {
87 // case "LB.1":
88 // processStartModel(identity, fields);
89 // break;
90 //
91 // case "LB.2":
92 // processModelKilled(identity, fields);
93 // break;
94 //
95 // default:
96 // // wrong message
97 // System.err.println("Received unknown message -- not processed: " + messageTypeId);
98 // }
99 // }
100 // else
101 // {
102 // // wrong receiver
103 // System.err.println("Received message not intended for LB but for " + receiverId + " -- not processed: ");
104 // }
105 // }
106 // this.lbSocket.close();
107 // this.lbContext.destroy();
108 // }
109 //
110 // /**
111 // * Process MC.X message and send LB.Y message back.
112 // * @param identity reply id for REQ-ROUTER pattern
113 // * @param fields the message
114 // * @throws Sim0MQException on error
115 // * @throws SerializationException on error
116 // */
117 // private void processStartFederateStarter(final String identity, final Object[] fields)
118 // throws Sim0MQException, SerializationException
119 // {
120 // StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "LB");
121 // String error = "";
122 //
123 // int modelPort = findFreePortNumber();
124 //
125 // if (modelPort == -1)
126 // {
127 // error = "No free port number";
128 // }
129 //
130 // else
131 //
132 // {
133 // try
134 // {
135 // ProcessBuilder pb = new ProcessBuilder();
136 //
137 // Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
138 // pb.directory(workingPath.toFile());
139 //
140 // String softwareCode = "";
141 // if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
142 // {
143 // System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
144 // + " in software properties file");
145 // }
146 // else
147 // {
148 // softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
149 //
150 // List<String> pbArgs = new ArrayList<>();
151 // pbArgs.add(softwareCode);
152 // pbArgs.add(startFederateMessage.getArgsBefore());
153 // pbArgs.add(startFederateMessage.getModelPath());
154 // pbArgs.addAll(Arrays.asList(
155 // startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
156 // pb.command(pbArgs);
157 //
158 // String stdIn = startFederateMessage.getRedirectStdin();
159 // String stdOut = startFederateMessage.getRedirectStdout();
160 // String stdErr = startFederateMessage.getRedirectStderr();
161 //
162 // if (stdIn.length() > 0)
163 // {
164 // // TODO working dir path if not absolute?
165 // File stdInFile = new File(stdIn);
166 // pb.redirectInput(stdInFile);
167 // }
168 //
169 // if (stdOut.length() > 0)
170 // {
171 // // TODO working dir path if not absolute?
172 // File stdOutFile = new File(stdOut);
173 // pb.redirectOutput(stdOutFile);
174 // }
175 //
176 // if (stdErr.length() > 0)
177 // {
178 // // TODO working dir path if not absolute?
179 // File stdErrFile = new File(stdErr);
180 // pb.redirectError(stdErrFile);
181 // }
182 //
183 // new Thread()
184 // {
185 // /** {@inheritDoc} */
186 // @Override
187 // public void run()
188 // {
189 // try
190 // {
191 // Process process = pb.start();
192 // SimpleLoadBalancer.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
193 // System.err.println("Process started:" + process.isAlive());
194 // }
195 // catch (IOException exception)
196 // {
197 // exception.printStackTrace();
198 // }
199 // }
200 // }.start();
201 //
202 // this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
203 // this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
204 //
205 // // Thread.sleep(1000);
206 //
207 // // wait till the model is ready...
208 // error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
209 // modelPort);
210 // }
211 // }
212 // catch (IOException exception)
213 // {
214 // exception.printStackTrace();
215 // error = exception.getMessage();
216 // }
217 // }
218 //
219 // System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
220 //
221 // // Send reply back to client
222 // this.lbSocket.sendMore(identity);
223 // this.lbSocket.sendMore("");
224 // //@formatter:off
225 // byte[] fs2Message = new FederateStartedMessage.Builder()
226 // .setSimulationRunId(startFederateMessage.getSimulationRunId())
227 // .setInstanceId(startFederateMessage.getInstanceId())
228 // .setSenderId("FS")
229 // .setReceiverId(startFederateMessage.getSenderId())
230 // .setMessageId(++this.messageCount)
231 // .setStatus(error.isEmpty() ? "started" : "error")
232 // .setError(error)
233 // .setModelPort(modelPort)
234 // .build()
235 // .createByteArray();
236 // this.lbSocket.send(fs2Message, 0);
237 // //@formatter:on
238 // }
239 //
240 // /**
241 // * Find a free port for the model.
242 // * @return the first free fort number in the range startPort - endPort, inclusive
243 // */
244 // private int findFreePortNumber()
245 // {
246 // for (int port = this.startPort; port <= this.endPort; port++)
247 // {
248 // if (!this.modelPortMap.containsValue(port))
249 // {
250 // // try if the port is really free
251 // ZMQ.Socket testSocket = null;
252 // try
253 // {
254 // testSocket = this.lbContext.createSocket(SocketType.REP);
255 // testSocket.bind("tcp://127.0.0.1:" + port);
256 // testSocket.unbind("tcp://127.0.0.1:" + port);
257 // testSocket.close();
258 // return port;
259 // }
260 // catch (Exception exception)
261 // {
262 // // port was not free
263 // if (testSocket != null)
264 // {
265 // try
266 // {
267 // testSocket.close();
268 // }
269 // catch (Exception e)
270 // {
271 // // ignore.
272 // }
273 // }
274 // }
275 // }
276 // }
277 // return -1;
278 // }
279 //
280 // /**
281 // * Wait for simulation to end using status polling with message FM.5.
282 // * @param federationRunId the name of the federation
283 // * @param modelId the String id of the model
284 // * @param modelPort port on which the model is listening
285 // * @return empty String for no error, filled String for error
286 // * @throws Sim0MQException on error
287 // * @throws SerializationException on error
288 // */
289 // private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
290 // throws Sim0MQException, SerializationException
291 // {
292 // boolean ok = true;
293 // String error = "";
294 // ZMQ.Socket modelSocket = null;
295 // try
296 // {
297 // modelSocket = this.lbContext.createSocket(SocketType.REQ);
298 // modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
299 // modelSocket.connect("tcp://127.0.0.1:" + modelPort);
300 // }
301 // catch (Exception exception)
302 // {
303 // exception.printStackTrace();
304 // ok = false;
305 // error = exception.getMessage();
306 // }
307 //
308 // boolean started = false;
309 // while (ok && !started)
310 // {
311 // byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
312 // MessageStatus.NEW);
313 // modelSocket.send(fs1Message, 0);
314 //
315 // byte[] reply = modelSocket.recv(0);
316 // Object[] replyMessage = SimulationMessage.decode(reply);
317 // System.out.println("Received\n" + SimulationMessage.print(replyMessage));
318 //
319 // if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
320 // && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
321 // {
322 // if (replyMessage[9].toString().equals("started"))
323 // {
324 // started = true;
325 // }
326 // else
327 // {
328 // // wait a second
329 // try
330 // {
331 // Thread.sleep(100);
332 // }
333 // catch (InterruptedException ie)
334 // {
335 // // ignore
336 // }
337 // }
338 // }
339 // else
340 // {
341 // ok = false;
342 // error = replyMessage[10].toString();
343 // System.err.println("Simulation start error -- status = " + replyMessage[9]);
344 // System.err.println("Error message = " + replyMessage[10]);
345 // }
346 // }
347 //
348 // if (modelSocket != null)
349 // {
350 // modelSocket.close();
351 // }
352 //
353 // return error;
354 // }
355 //
356 // /**
357 // * Process FM.8 message and send FS.4 message back.
358 // * @param identity reply id for REQ-ROUTER pattern
359 // * @param fields the message
360 // * @throws Sim0MQException on error
361 // * @throws SerializationException on error
362 // */
363 // private void processKillFederateStarter(final String identity, final Object[] fields)
364 // throws Sim0MQException, SerializationException
365 // {
366 // boolean status = true;
367 // String error = "";
368 //
369 // Object federationRunId = fields[1];
370 // String senderId = fields[2].toString();
371 //
372 // String modelId = fields[8].toString();
373 // if (!this.modelPortMap.containsKey(modelId))
374 // {
375 // status = false;
376 // error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
377 // }
378 // else
379 // {
380 // int modelPort = this.modelPortMap.remove(modelId);
381 // Process process = this.runningProcessMap.remove(modelId);
382 //
383 // try
384 // {
385 // try
386 // {
387 // ZMQ.Socket modelSocket = this.lbContext.createSocket(SocketType.REQ);
388 // modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
389 // modelSocket.connect("tcp://127.0.0.1:" + modelPort);
390 //
391 // byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
392 // ++this.messageCount, MessageStatus.NEW);
393 // modelSocket.send(fs3Message, 0);
394 //
395 // modelSocket.close();
396 // }
397 // catch (Exception exception)
398 // {
399 // exception.printStackTrace();
400 // status = true;
401 // error = exception.getMessage();
402 // }
403 //
404 // try
405 // {
406 // Thread.sleep(100);
407 // }
408 // catch (InterruptedException ie)
409 // {
410 // // ignore
411 // }
412 //
413 // if (process != null && process.isAlive())
414 // {
415 // process.destroyForcibly();
416 // }
417 //
418 // StartFederateMessage sfm = this.startFederateMessages.get(modelId);
419 // if (sfm.isDeleteStdout())
420 // {
421 // if (sfm.getRedirectStdout().length() > 0)
422 // {
423 // File stdOutFile = new File(sfm.getRedirectStdout());
424 // stdOutFile.delete();
425 // }
426 // }
427 //
428 // if (sfm.isDeleteStderr())
429 // {
430 // if (sfm.getRedirectStderr().length() > 0)
431 // {
432 // File stdErrFile = new File(sfm.getRedirectStderr());
433 // stdErrFile.delete();
434 // }
435 // }
436 //
437 // if (sfm.isDeleteWorkingDirectory())
438 // {
439 // File workingDir = new File(sfm.getWorkingDirectory());
440 // workingDir.delete();
441 // }
442 // }
443 // catch (Exception exception)
444 // {
445 // exception.printStackTrace();
446 // status = false;
447 // error = exception.getMessage();
448 // }
449 //
450 // byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
451 // MessageStatus.NEW, modelId, status, error);
452 // this.lbSocket.sendMore(identity);
453 // this.lbSocket.sendMore("");
454 // this.lbSocket.send(fs4Message, 0);
455 // }
456 // }
457 //
458 // /**
459 // * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
460 // * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
461 // * @param args the federation name and port on which the FederateStarter is listening
462 // * @throws Sim0MQException on error
463 // * @throws SerializationException on error
464 // */
465 // public static void main(final String[] args) throws Sim0MQException, SerializationException
466 // {
467 // if (args.length < 4)
468 // {
469 // System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
470 // System.exit(-1);
471 // }
472 //
473 // String sPort = args[0];
474 // int port = 0;
475 // try
476 // {
477 // port = Integer.parseInt(sPort);
478 // }
479 // catch (NumberFormatException nfe)
480 // {
481 // System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
482 // System.exit(-1);
483 // }
484 // if (port == 0 || port > 65535)
485 // {
486 // System.err.println("PortNumber should be between 1 and 65535");
487 // System.exit(-1);
488 // }
489 //
490 // String propertiesFile = args[1];
491 // Properties softwareProperties = new Properties();
492 // InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
493 // try
494 // {
495 // softwareProperties.load(propertiesStream);
496 // }
497 // catch (IOException | NullPointerException e)
498 // {
499 // System.err.println("Could not find or read software properties file " + propertiesFile);
500 // System.exit(-1);
501 // }
502 //
503 // String sStartPort = args[2];
504 // int startPort = 0;
505 // try
506 // {
507 // startPort = Integer.parseInt(sStartPort);
508 // }
509 // catch (NumberFormatException nfe)
510 // {
511 // System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
512 // System.exit(-1);
513 // }
514 // if (startPort == 0 || startPort > 65535)
515 // {
516 // System.err.println("startPort should be between 1 and 65535");
517 // System.exit(-1);
518 // }
519 //
520 // String sEndPort = args[3];
521 // int endPort = 0;
522 // try
523 // {
524 // endPort = Integer.parseInt(sEndPort);
525 // }
526 // catch (NumberFormatException nfe)
527 // {
528 // System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
529 // System.exit(-1);
530 // }
531 // if (endPort == 0 || endPort > 65535)
532 // {
533 // System.err.println("endPort should be between 1 and 65535");
534 // System.exit(-1);
535 // }
536 //
537 // new SimpleLoadBalancer(port, softwareProperties, startPort, endPort);
538 // }
539 //
540 // /** Record with information about the nodes that have a FederateStarter running. */
541 // static class FederateStarterNode implements Comparable<FederateStarterNode>
542 // {
543 // /** the node name or IP address where the Federate Starter resides. */
544 // private final String fsNodeName;
545 //
546 // /** the port of the Federate Starter. */
547 // private final int port;
548 //
549 // /** the maximum load on the node (e.g., the maximum number of concurrent models). */
550 // private final int maxLoad;
551 //
552 // /** the current load on the node. */
553 // private int currentLoad = 0;
554 //
555 // /** the priority of the node. Higher value is higher priority. */
556 // private int priority;
557 //
558 // /**
559 // * @param fsNodeName String; the node name or IP address where the Federate Starter resides
560 // * @param port int; the port of the Federate Starter
561 // * @param maxLoad int; the maximum load on the node (e.g., the maximum number of concurrent models)
562 // * @param priority int; the priority of the node. Higher value is higher priority.
563 // */
564 // FederateStarterNode(final String fsNodeName, final int port, final int maxLoad, final int priority)
565 // {
566 // this.fsNodeName = fsNodeName;
567 // this.port = port;
568 // this.maxLoad = maxLoad;
569 // this.priority = priority;
570 // }
571 //
572 // /**
573 // * @return currentLoad
574 // */
575 // public final int getCurrentLoad()
576 // {
577 // return this.currentLoad;
578 // }
579 //
580 // /**
581 // * @param currentLoad set currentLoad
582 // */
583 // public final void setCurrentLoad(final int currentLoad)
584 // {
585 // this.currentLoad = currentLoad;
586 // }
587 //
588 // /**
589 // * @return priority
590 // */
591 // public final int getPriority()
592 // {
593 // return this.priority;
594 // }
595 //
596 // /**
597 // * @param priority set priority
598 // */
599 // public final void setPriority(final int priority)
600 // {
601 // this.priority = priority;
602 // }
603 //
604 // /**
605 // * @return fsNodeName
606 // */
607 // public final String getFsNodeName()
608 // {
609 // return this.fsNodeName;
610 // }
611 //
612 // /**
613 // * @return port
614 // */
615 // public final int getPort()
616 // {
617 // return this.port;
618 // }
619 //
620 // /**
621 // * @return maxLoad
622 // */
623 // public final int getMaxLoad()
624 // {
625 // return this.maxLoad;
626 // }
627 //
628 // /** {@inheritDoc} */
629 // @Override
630 // public int hashCode()
631 // {
632 // final int prime = 31;
633 // int result = 1;
634 // result = prime * result + ((this.fsNodeName == null) ? 0 : this.fsNodeName.hashCode());
635 // result = prime * result + this.port;
636 // return result;
637 // }
638 //
639 // /** {@inheritDoc} */
640 // @SuppressWarnings("checkstyle:needbraces")
641 // @Override
642 // public boolean equals(final Object obj)
643 // {
644 // if (this == obj)
645 // return true;
646 // if (obj == null)
647 // return false;
648 // if (getClass() != obj.getClass())
649 // return false;
650 // FederateStarterNode other = (FederateStarterNode) obj;
651 // if (this.fsNodeName == null)
652 // {
653 // if (other.fsNodeName != null)
654 // return false;
655 // }
656 // else if (!this.fsNodeName.equals(other.fsNodeName))
657 // return false;
658 // if (this.port != other.port)
659 // return false;
660 // return true;
661 // }
662 //
663 // /** {@inheritDoc} */
664 // @SuppressWarnings("checkstyle:needbraces")
665 // @Override
666 // public int compareTo(final FederateStarterNode o)
667 // {
668 // // Compares this object with the specified object for order. Returns a
669 // // negative integer, zero, or a positive integer as this object is less
670 // // than, equal to, or greater than the specified object.
671 // if (this.equals(o))
672 // return 0;
673 //
674 // // higher priority number means higher priority
675 // if (this.priority > o.priority)
676 // return 1;
677 // if (this.priority < o.priority)
678 // return -1;
679 //
680 // // higher remaining load means higher priority
681 // if (this.maxLoad - this.currentLoad > o.maxLoad - o.currentLoad)
682 // return 1;
683 // if (this.maxLoad - this.currentLoad < o.maxLoad - o.currentLoad)
684 // return -1;
685 //
686 // if (this.hashCode() > o.hashCode())
687 // return 1;
688 // if (this.hashCode() < o.hashCode())
689 // return -1;
690 // //
691 // return 0;
692 // }
693 // }
694 }