1 package org.opentrafficsim.demo.web;
2
3 import org.zeromq.ZContext;
4 import org.zeromq.ZMQ;
5
6 /**
7 * The LoadBalancer start listening on the given port for messages. It is started with a map of network nodes where
8 * FederateStarters can start models, and a capacity of each node.<br>
9 * <br>
10 * The program is called as follows: java -jar LocalLoadBalancer.jar key1=value1 key2=value2 ... The following keys are defined:
11 * <ul>
12 * <li><b>port:</b> the port on which the LoadBalancer listens</li>
13 * <li><b>nodeFile:</b> the file that contains the node information</li>
14 * </ul>
15 * The nodeFile is expected to be tab-delimited with the following columns (and an example):
16 *
17 * <pre>
18 * nodeAddress fsPort maxLoad priority
19 * 10.0.0.121 5500 8 10
20 * 10.0.0.122 5500 8 10
21 * 10.0.0.123 5500 8 10
22 * 127.0.0.1 5500 8 5
23 * </pre>
24 *
25 * Where nodeAddress is the network name of the node or its IP address, fsPort is the port where the FederateStarter can be
26 * found on the node, maxLoad is an int indicating the maximum number of jobs the node can handle, and priority indicates which
27 * nodes should be scheduled first (nodes of lower priority are only scheduled when high priority nodes are full). The example
28 * above shows three nodes with priority 10, and the localhost with a lower priority. So only when 24 federates are running,
29 * localhost will be used.
30 * <p>
31 * Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
32 * BSD-style license. See <a href="https://opentrafficsim.org/docs/current/license.html">OTS License</a>.
33 * </p>
34 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
35 */
36 public class SimpleLoadBalancer
37 {
38 // /** the port number to listen on. */
39 // private final int lbPort;
40
41 // /** the nodes with federate starters that can be used. */
42 // private final Set<FederateStarterNode> federateStarterNodes;
43
44 /** the 0mq socket. */
45 private ZMQ.Socket lbSocket;
46
47 /** the 0mq context. */
48 private ZContext lbContext;
49
50 /** message count. */
51 private long messageCount = 0;
52
53 // /**
54 // * @param lbPort the port number to listen on
55 // * @param federateStarterNodes the nodes with federate starters that can be used
56 // * @throws Sim0MQException on error
57 // * @throws SerializationException on error
58 // */
59 // public SimpleLoadBalancer(final int lbPort, final Set<FederateStarterNode> federateStarterNodes)
60 // throws Sim0MQException, SerializationException
61 // {
62 // super();
63 // this.lbPort = lbPort;
64 // this.federateStarterNodes = federateStarterNodes;
65 //
66 // this.lbContext = new ZContext(1);
67 //
68 // this.lbSocket = this.lbContext.createSocket(SocketType.ROUTER);
69 // this.lbSocket.bind("tcp://*:" + this.lbPort);
70 //
71 // while (!Thread.currentThread().isInterrupted())
72 // {
73 // // Wait for next request from the [web] client -- first the identity (String) and the delimiter (#0)
74 // String identity = this.lbSocket.recvStr();
75 // this.lbSocket.recvStr();
76 //
77 // byte[] request = this.lbSocket.recv(0);
78 // Object[] fields = SimulationMessage.decode(request);
79 // String messageTypeId = fields[4].toString();
80 // String receiverId = fields[3].toString();
81 //
82 // System.out.println("Received " + SimulationMessage.print(fields));
83 //
84 // if (receiverId.equals("LB"))
85 // {
86 // switch (messageTypeId)
87 // {
88 // case "LB.1":
89 // processStartModel(identity, fields);
90 // break;
91 //
92 // case "LB.2":
93 // processModelKilled(identity, fields);
94 // break;
95 //
96 // default:
97 // // wrong message
98 // System.err.println("Received unknown message -- not processed: " + messageTypeId);
99 // }
100 // }
101 // else
102 // {
103 // // wrong receiver
104 // System.err.println("Received message not intended for LB but for " + receiverId + " -- not processed: ");
105 // }
106 // }
107 // this.lbSocket.close();
108 // this.lbContext.destroy();
109 // }
110 //
111 // /**
112 // * Process MC.X message and send LB.Y message back.
113 // * @param identity reply id for REQ-ROUTER pattern
114 // * @param fields the message
115 // * @throws Sim0MQException on error
116 // * @throws SerializationException on error
117 // */
118 // private void processStartFederateStarter(final String identity, final Object[] fields)
119 // throws Sim0MQException, SerializationException
120 // {
121 // StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "LB");
122 // String error = "";
123 //
124 // int modelPort = findFreePortNumber();
125 //
126 // if (modelPort == -1)
127 // {
128 // error = "No free port number";
129 // }
130 //
131 // else
132 //
133 // {
134 // try
135 // {
136 // ProcessBuilder pb = new ProcessBuilder();
137 //
138 // Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
139 // pb.directory(workingPath.toFile());
140 //
141 // String softwareCode = "";
142 // if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
143 // {
144 // System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
145 // + " in software properties file");
146 // }
147 // else
148 // {
149 // softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
150 //
151 // List<String> pbArgs = new ArrayList<>();
152 // pbArgs.add(softwareCode);
153 // pbArgs.add(startFederateMessage.getArgsBefore());
154 // pbArgs.add(startFederateMessage.getModelPath());
155 // pbArgs.addAll(Arrays.asList(
156 // startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
157 // pb.command(pbArgs);
158 //
159 // String stdIn = startFederateMessage.getRedirectStdin();
160 // String stdOut = startFederateMessage.getRedirectStdout();
161 // String stdErr = startFederateMessage.getRedirectStderr();
162 //
163 // if (stdIn.length() > 0)
164 // {
165 // // TODO working dir path if not absolute?
166 // File stdInFile = new File(stdIn);
167 // pb.redirectInput(stdInFile);
168 // }
169 //
170 // if (stdOut.length() > 0)
171 // {
172 // // TODO working dir path if not absolute?
173 // File stdOutFile = new File(stdOut);
174 // pb.redirectOutput(stdOutFile);
175 // }
176 //
177 // if (stdErr.length() > 0)
178 // {
179 // // TODO working dir path if not absolute?
180 // File stdErrFile = new File(stdErr);
181 // pb.redirectError(stdErrFile);
182 // }
183 //
184 // new Thread()
185 // {
186 // /** {@inheritDoc} */
187 // @Override
188 // public void run()
189 // {
190 // try
191 // {
192 // Process process = pb.start();
193 // SimpleLoadBalancer.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
194 // System.err.println("Process started:" + process.isAlive());
195 // }
196 // catch (IOException exception)
197 // {
198 // exception.printStackTrace();
199 // }
200 // }
201 // }.start();
202 //
203 // this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
204 // this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
205 //
206 // // Thread.sleep(1000);
207 //
208 // // wait till the model is ready...
209 // error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
210 // modelPort);
211 // }
212 // }
213 // catch (IOException exception)
214 // {
215 // exception.printStackTrace();
216 // error = exception.getMessage();
217 // }
218 // }
219 //
220 // System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
221 //
222 // // Send reply back to client
223 // this.lbSocket.sendMore(identity);
224 // this.lbSocket.sendMore("");
225 // //@formatter:off
226 // byte[] fs2Message = new FederateStartedMessage.Builder()
227 // .setSimulationRunId(startFederateMessage.getSimulationRunId())
228 // .setInstanceId(startFederateMessage.getInstanceId())
229 // .setSenderId("FS")
230 // .setReceiverId(startFederateMessage.getSenderId())
231 // .setMessageId(++this.messageCount)
232 // .setStatus(error.isEmpty() ? "started" : "error")
233 // .setError(error)
234 // .setModelPort(modelPort)
235 // .build()
236 // .createByteArray();
237 // this.lbSocket.send(fs2Message, 0);
238 // //@formatter:on
239 // }
240 //
241 // /**
242 // * Find a free port for the model.
243 // * @return the first free fort number in the range startPort - endPort, inclusive
244 // */
245 // private int findFreePortNumber()
246 // {
247 // for (int port = this.startPort; port <= this.endPort; port++)
248 // {
249 // if (!this.modelPortMap.containsValue(port))
250 // {
251 // // try if the port is really free
252 // ZMQ.Socket testSocket = null;
253 // try
254 // {
255 // testSocket = this.lbContext.createSocket(SocketType.REP);
256 // testSocket.bind("tcp://127.0.0.1:" + port);
257 // testSocket.unbind("tcp://127.0.0.1:" + port);
258 // testSocket.close();
259 // return port;
260 // }
261 // catch (Exception exception)
262 // {
263 // // port was not free
264 // if (testSocket != null)
265 // {
266 // try
267 // {
268 // testSocket.close();
269 // }
270 // catch (Exception e)
271 // {
272 // // ignore.
273 // }
274 // }
275 // }
276 // }
277 // }
278 // return -1;
279 // }
280 //
281 // /**
282 // * Wait for simulation to end using status polling with message FM.5.
283 // * @param federationRunId the name of the federation
284 // * @param modelId the String id of the model
285 // * @param modelPort port on which the model is listening
286 // * @return empty String for no error, filled String for error
287 // * @throws Sim0MQException on error
288 // * @throws SerializationException on error
289 // */
290 // private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
291 // throws Sim0MQException, SerializationException
292 // {
293 // boolean ok = true;
294 // String error = "";
295 // ZMQ.Socket modelSocket = null;
296 // try
297 // {
298 // modelSocket = this.lbContext.createSocket(SocketType.REQ);
299 // modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
300 // modelSocket.connect("tcp://127.0.0.1:" + modelPort);
301 // }
302 // catch (Exception exception)
303 // {
304 // exception.printStackTrace();
305 // ok = false;
306 // error = exception.getMessage();
307 // }
308 //
309 // boolean started = false;
310 // while (ok && !started)
311 // {
312 // byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
313 // MessageStatus.NEW);
314 // modelSocket.send(fs1Message, 0);
315 //
316 // byte[] reply = modelSocket.recv(0);
317 // Object[] replyMessage = SimulationMessage.decode(reply);
318 // System.out.println("Received\n" + SimulationMessage.print(replyMessage));
319 //
320 // if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
321 // && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
322 // {
323 // if (replyMessage[9].toString().equals("started"))
324 // {
325 // started = true;
326 // }
327 // else
328 // {
329 // // wait a second
330 // try
331 // {
332 // Thread.sleep(100);
333 // }
334 // catch (InterruptedException ie)
335 // {
336 // // ignore
337 // }
338 // }
339 // }
340 // else
341 // {
342 // ok = false;
343 // error = replyMessage[10].toString();
344 // System.err.println("Simulation start error -- status = " + replyMessage[9]);
345 // System.err.println("Error message = " + replyMessage[10]);
346 // }
347 // }
348 //
349 // if (modelSocket != null)
350 // {
351 // modelSocket.close();
352 // }
353 //
354 // return error;
355 // }
356 //
357 // /**
358 // * Process FM.8 message and send FS.4 message back.
359 // * @param identity reply id for REQ-ROUTER pattern
360 // * @param fields the message
361 // * @throws Sim0MQException on error
362 // * @throws SerializationException on error
363 // */
364 // private void processKillFederateStarter(final String identity, final Object[] fields)
365 // throws Sim0MQException, SerializationException
366 // {
367 // boolean status = true;
368 // String error = "";
369 //
370 // Object federationRunId = fields[1];
371 // String senderId = fields[2].toString();
372 //
373 // String modelId = fields[8].toString();
374 // if (!this.modelPortMap.containsKey(modelId))
375 // {
376 // status = false;
377 // error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
378 // }
379 // else
380 // {
381 // int modelPort = this.modelPortMap.remove(modelId);
382 // Process process = this.runningProcessMap.remove(modelId);
383 //
384 // try
385 // {
386 // try
387 // {
388 // ZMQ.Socket modelSocket = this.lbContext.createSocket(SocketType.REQ);
389 // modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
390 // modelSocket.connect("tcp://127.0.0.1:" + modelPort);
391 //
392 // byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
393 // ++this.messageCount, MessageStatus.NEW);
394 // modelSocket.send(fs3Message, 0);
395 //
396 // modelSocket.close();
397 // }
398 // catch (Exception exception)
399 // {
400 // exception.printStackTrace();
401 // status = true;
402 // error = exception.getMessage();
403 // }
404 //
405 // try
406 // {
407 // Thread.sleep(100);
408 // }
409 // catch (InterruptedException ie)
410 // {
411 // // ignore
412 // }
413 //
414 // if (process != null && process.isAlive())
415 // {
416 // process.destroyForcibly();
417 // }
418 //
419 // StartFederateMessage sfm = this.startFederateMessages.get(modelId);
420 // if (sfm.isDeleteStdout())
421 // {
422 // if (sfm.getRedirectStdout().length() > 0)
423 // {
424 // File stdOutFile = new File(sfm.getRedirectStdout());
425 // stdOutFile.delete();
426 // }
427 // }
428 //
429 // if (sfm.isDeleteStderr())
430 // {
431 // if (sfm.getRedirectStderr().length() > 0)
432 // {
433 // File stdErrFile = new File(sfm.getRedirectStderr());
434 // stdErrFile.delete();
435 // }
436 // }
437 //
438 // if (sfm.isDeleteWorkingDirectory())
439 // {
440 // File workingDir = new File(sfm.getWorkingDirectory());
441 // workingDir.delete();
442 // }
443 // }
444 // catch (Exception exception)
445 // {
446 // exception.printStackTrace();
447 // status = false;
448 // error = exception.getMessage();
449 // }
450 //
451 // byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
452 // MessageStatus.NEW, modelId, status, error);
453 // this.lbSocket.sendMore(identity);
454 // this.lbSocket.sendMore("");
455 // this.lbSocket.send(fs4Message, 0);
456 // }
457 // }
458 //
459 // /**
460 // * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
461 // * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
462 // * @param args the federation name and port on which the FederateStarter is listening
463 // * @throws Sim0MQException on error
464 // * @throws SerializationException on error
465 // */
466 // public static void main(final String[] args) throws Sim0MQException, SerializationException
467 // {
468 // if (args.length < 4)
469 // {
470 // System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
471 // System.exit(-1);
472 // }
473 //
474 // String sPort = args[0];
475 // int port = 0;
476 // try
477 // {
478 // port = Integer.parseInt(sPort);
479 // }
480 // catch (NumberFormatException nfe)
481 // {
482 // System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
483 // System.exit(-1);
484 // }
485 // if (port == 0 || port > 65535)
486 // {
487 // System.err.println("PortNumber should be between 1 and 65535");
488 // System.exit(-1);
489 // }
490 //
491 // String propertiesFile = args[1];
492 // Properties softwareProperties = new Properties();
493 // InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
494 // try
495 // {
496 // softwareProperties.load(propertiesStream);
497 // }
498 // catch (IOException | NullPointerException e)
499 // {
500 // System.err.println("Could not find or read software properties file " + propertiesFile);
501 // System.exit(-1);
502 // }
503 //
504 // String sStartPort = args[2];
505 // int startPort = 0;
506 // try
507 // {
508 // startPort = Integer.parseInt(sStartPort);
509 // }
510 // catch (NumberFormatException nfe)
511 // {
512 // System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
513 // System.exit(-1);
514 // }
515 // if (startPort == 0 || startPort > 65535)
516 // {
517 // System.err.println("startPort should be between 1 and 65535");
518 // System.exit(-1);
519 // }
520 //
521 // String sEndPort = args[3];
522 // int endPort = 0;
523 // try
524 // {
525 // endPort = Integer.parseInt(sEndPort);
526 // }
527 // catch (NumberFormatException nfe)
528 // {
529 // System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
530 // System.exit(-1);
531 // }
532 // if (endPort == 0 || endPort > 65535)
533 // {
534 // System.err.println("endPort should be between 1 and 65535");
535 // System.exit(-1);
536 // }
537 //
538 // new SimpleLoadBalancer(port, softwareProperties, startPort, endPort);
539 // }
540 //
541 // /** Record with information about the nodes that have a FederateStarter running. */
542 // static class FederateStarterNode implements Comparable<FederateStarterNode>
543 // {
544 // /** the node name or IP address where the Federate Starter resides. */
545 // private final String fsNodeName;
546 //
547 // /** the port of the Federate Starter. */
548 // private final int port;
549 //
550 // /** the maximum load on the node (e.g., the maximum number of concurrent models). */
551 // private final int maxLoad;
552 //
553 // /** the current load on the node. */
554 // private int currentLoad = 0;
555 //
556 // /** the priority of the node. Higher value is higher priority. */
557 // private int priority;
558 //
559 // /**
560 // * @param fsNodeName String; the node name or IP address where the Federate Starter resides
561 // * @param port int; the port of the Federate Starter
562 // * @param maxLoad int; the maximum load on the node (e.g., the maximum number of concurrent models)
563 // * @param priority int; the priority of the node. Higher value is higher priority.
564 // */
565 // FederateStarterNode(final String fsNodeName, final int port, final int maxLoad, final int priority)
566 // {
567 // super();
568 // this.fsNodeName = fsNodeName;
569 // this.port = port;
570 // this.maxLoad = maxLoad;
571 // this.priority = priority;
572 // }
573 //
574 // /**
575 // * @return currentLoad
576 // */
577 // public final int getCurrentLoad()
578 // {
579 // return this.currentLoad;
580 // }
581 //
582 // /**
583 // * @param currentLoad set currentLoad
584 // */
585 // public final void setCurrentLoad(final int currentLoad)
586 // {
587 // this.currentLoad = currentLoad;
588 // }
589 //
590 // /**
591 // * @return priority
592 // */
593 // public final int getPriority()
594 // {
595 // return this.priority;
596 // }
597 //
598 // /**
599 // * @param priority set priority
600 // */
601 // public final void setPriority(final int priority)
602 // {
603 // this.priority = priority;
604 // }
605 //
606 // /**
607 // * @return fsNodeName
608 // */
609 // public final String getFsNodeName()
610 // {
611 // return this.fsNodeName;
612 // }
613 //
614 // /**
615 // * @return port
616 // */
617 // public final int getPort()
618 // {
619 // return this.port;
620 // }
621 //
622 // /**
623 // * @return maxLoad
624 // */
625 // public final int getMaxLoad()
626 // {
627 // return this.maxLoad;
628 // }
629 //
630 // /** {@inheritDoc} */
631 // @Override
632 // public int hashCode()
633 // {
634 // final int prime = 31;
635 // int result = 1;
636 // result = prime * result + ((this.fsNodeName == null) ? 0 : this.fsNodeName.hashCode());
637 // result = prime * result + this.port;
638 // return result;
639 // }
640 //
641 // /** {@inheritDoc} */
642 // @SuppressWarnings("checkstyle:needbraces")
643 // @Override
644 // public boolean equals(final Object obj)
645 // {
646 // if (this == obj)
647 // return true;
648 // if (obj == null)
649 // return false;
650 // if (getClass() != obj.getClass())
651 // return false;
652 // FederateStarterNode other = (FederateStarterNode) obj;
653 // if (this.fsNodeName == null)
654 // {
655 // if (other.fsNodeName != null)
656 // return false;
657 // }
658 // else if (!this.fsNodeName.equals(other.fsNodeName))
659 // return false;
660 // if (this.port != other.port)
661 // return false;
662 // return true;
663 // }
664 //
665 // /** {@inheritDoc} */
666 // @SuppressWarnings("checkstyle:needbraces")
667 // @Override
668 // public int compareTo(final FederateStarterNode o)
669 // {
670 // // Compares this object with the specified object for order. Returns a
671 // // negative integer, zero, or a positive integer as this object is less
672 // // than, equal to, or greater than the specified object.
673 // if (this.equals(o))
674 // return 0;
675 //
676 // // higher priority number means higher priority
677 // if (this.priority > o.priority)
678 // return 1;
679 // if (this.priority < o.priority)
680 // return -1;
681 //
682 // // higher remaining load means higher priority
683 // if (this.maxLoad - this.currentLoad > o.maxLoad - o.currentLoad)
684 // return 1;
685 // if (this.maxLoad - this.currentLoad < o.maxLoad - o.currentLoad)
686 // return -1;
687 //
688 // if (this.hashCode() > o.hashCode())
689 // return 1;
690 // if (this.hashCode() < o.hashCode())
691 // return -1;
692 // //
693 // return 0;
694 // }
695 // }
696 }