1 package org.opentrafficsim.sim0mq.publisher;
2
3 import java.rmi.RemoteException;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.LinkedHashMap;
7 import java.util.List;
8 import java.util.Map;
9
10 import org.djunits.Throw;
11 import org.djutils.event.EventInterface;
12 import org.djutils.event.EventListenerInterface;
13 import org.djutils.event.EventProducerInterface;
14 import org.djutils.event.TimedEvent;
15 import org.djutils.event.TimedEventType;
16 import org.djutils.metadata.MetaData;
17 import org.djutils.metadata.ObjectDescriptor;
18 import org.djutils.serialization.SerializationException;
19 import org.sim0mq.Sim0MQException;
20
21
22
23
24
25
26
27
28
29
30
31 public class SubscriptionHandler
32 {
33
34 private final String id;
35
36
37 private final TransceiverInterface listTransceiver;
38
39
40 private final LookupEventProducerInterface eventProducerForAddRemoveOrChange;
41
42
43 private final TimedEventType addedEventType;
44
45
46 private final TimedEventType removedEventType;
47
48
49 private final TimedEventType changeEventType;
50
51
52 private final SubscriptionHandler elementSubscriptionHandler;
53
54
55 private final Map<ReturnWrapper, Subscription> subscriptions = new LinkedHashMap<>();
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
72 final LookupEventProducerInterface eventProducerForAddRemoveOrChange, final TimedEventType addedEventType,
73 final TimedEventType removedEventType, final TimedEventType changeEventType,
74 final SubscriptionHandler elementSubscriptionHandler)
75 {
76 Throw.whenNull(id, "Id may not be null");
77 Throw.when(
78 null == eventProducerForAddRemoveOrChange
79 && (addedEventType != null || removedEventType != null || changeEventType != null),
80 NullPointerException.class,
81 "eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
82 this.id = id;
83 this.listTransceiver = listTransceiver;
84 this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
85 this.addedEventType = addedEventType;
86 this.removedEventType = removedEventType;
87 this.changeEventType = changeEventType;
88 this.elementSubscriptionHandler = elementSubscriptionHandler;
89 }
90
91
92
93
94
95
96 public MetaData listRequestMetaData()
97 {
98 return this.listTransceiver.getAddressFields();
99 }
100
101
102
103
104
105 public MetaData listResultMetaData()
106 {
107 return this.listTransceiver.getResultFields();
108 }
109
110
111
112
113
114
115
116
117
118 public void get(final Object[] address, final ReturnWrapper returnWrapper)
119 throws RemoteException, Sim0MQException, SerializationException
120 {
121 sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
122 }
123
124
125
126
127
128 public TransceiverInterface getListTransceiver()
129 {
130 return this.listTransceiver;
131 }
132
133
134
135
136
137 public final EnumSet<Command> subscriptionOptions()
138 {
139 EnumSet<Command> result = EnumSet.noneOf(Command.class);
140 if (null != this.addedEventType)
141 {
142 result.add(Command.SUBSCRIBE_TO_ADD);
143 result.add(Command.UNSUBSCRIBE_FROM_ADD);
144 }
145 if (null != this.removedEventType)
146 {
147 result.add(Command.SUBSCRIBE_TO_REMOVE);
148 result.add(Command.UNSUBSCRIBE_FROM_REMOVE);
149 }
150 if (null != this.changeEventType)
151 {
152 result.add(Command.SUBSCRIBE_TO_CHANGE);
153 result.add(Command.UNSUBSCRIBE_FROM_CHANGE);
154 }
155 if (null != this.listTransceiver)
156 {
157 result.add(Command.GET_CURRENT);
158 result.add(Command.GET_ADDRESS_META_DATA);
159 result.add(Command.GET_RESULT_META_DATA);
160 }
161 return result;
162 }
163
164
165
166
167
168
169
170
171
172
173 private void subscribeTo(final Object[] address, final TimedEventType eventType, final ReturnWrapper returnWrapper)
174 throws RemoteException, Sim0MQException, SerializationException
175 {
176 if (null == eventType)
177 {
178 returnWrapper.nack("Does not support subscribe to");
179 return;
180 }
181 String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
182 if (bad != null)
183 {
184 returnWrapper.nack("Bad address: " + bad);
185 return;
186 }
187 EventProducerInterface epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
188 if (null == epi)
189 {
190
191 return;
192 }
193 Subscription subscription = this.subscriptions.get(returnWrapper);
194 if (null == subscription)
195 {
196 subscription = new Subscription(returnWrapper);
197 this.subscriptions.put(returnWrapper, subscription);
198 }
199 if (epi.addListener(subscription, eventType))
200 {
201 returnWrapper.ack("Subscription created");
202 }
203 else
204 {
205
206 returnWrapper.ack("There was already such a subscription active");
207 }
208
209
210 }
211
212
213
214
215
216
217
218
219
220
221 private void unsubscribeFrom(final Object[] address, final TimedEventType eventType, final ReturnWrapper returnWrapper)
222 throws RemoteException, Sim0MQException, SerializationException
223 {
224 if (null == eventType)
225 {
226 returnWrapper.nack("Does not support unsubscribe from");
227 return;
228 }
229 String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
230 if (bad != null)
231 {
232 returnWrapper.nack("Bad address: " + bad);
233 return;
234 }
235 EventProducerInterface epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
236 if (null == epi)
237 {
238 returnWrapper.nack("Cound not find the event producer of the subscription; has it dissapeared?");
239 return;
240 }
241 Subscription subscription = this.subscriptions.get(returnWrapper);
242 if (null == subscription)
243 {
244 returnWrapper.nack("Cound not find a subscription to cancel");
245 }
246 else if (!epi.removeListener(subscription, eventType))
247 {
248 returnWrapper.nack("Subscription was not found");
249 }
250 else
251 {
252 this.subscriptions.remove(returnWrapper);
253 returnWrapper.ack("Subscription removed");
254 }
255 }
256
257
258
259
260
261 public final String getId()
262 {
263 return this.id;
264 }
265
266
267
268
269 public enum Command
270 {
271
272 SUBSCRIBE_TO_ADD,
273
274 SUBSCRIBE_TO_REMOVE,
275
276 SUBSCRIBE_TO_CHANGE,
277
278 UNSUBSCRIBE_FROM_ADD,
279
280 UNSUBSCRIBE_FROM_REMOVE,
281
282 UNSUBSCRIBE_FROM_CHANGE,
283
284 GET_CURRENT,
285
286 GET_ADDRESS_META_DATA,
287
288 GET_RESULT_META_DATA,
289
290 GET_LIST,
291
292 GET_COMMANDS;
293 }
294
295
296
297
298
299
300 public static Command lookupCommand(final String commandString)
301 {
302 if ("GET_ADDRESS_META_DATA".equals(commandString))
303 {
304 return Command.GET_ADDRESS_META_DATA;
305 }
306 else if ("GET_CURRENT".equals(commandString))
307 {
308 return Command.GET_CURRENT;
309 }
310 else if ("GET_RESULT_META_DATA".equals(commandString))
311 {
312 return Command.GET_RESULT_META_DATA;
313 }
314 else if ("GET_RESULT_META_DATA".equals(commandString))
315 {
316 return Command.GET_RESULT_META_DATA;
317 }
318 else if ("SUBSCRIBE_TO_ADD".equals(commandString))
319 {
320 return Command.SUBSCRIBE_TO_ADD;
321 }
322 else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
323 {
324 return Command.SUBSCRIBE_TO_CHANGE;
325 }
326 else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
327 {
328 return Command.SUBSCRIBE_TO_REMOVE;
329 }
330 else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
331 {
332 return Command.UNSUBSCRIBE_FROM_ADD;
333 }
334 else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
335 {
336 return Command.UNSUBSCRIBE_FROM_REMOVE;
337 }
338 else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
339 {
340 return Command.UNSUBSCRIBE_FROM_CHANGE;
341 }
342 else if ("GET_LIST".contentEquals(commandString))
343 {
344 return Command.GET_LIST;
345 }
346 else if ("GET_COMMANDS".contentEquals(commandString))
347 {
348 return Command.GET_COMMANDS;
349 }
350 System.err.println("Could not find command with name \"" + commandString + "\"");
351 return null;
352 }
353
354
355
356
357
358
359
360
361
362
363 public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
364 throws RemoteException, Sim0MQException, SerializationException
365 {
366 Throw.whenNull(command, "Command may not be null");
367 Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
368 switch (command)
369 {
370 case SUBSCRIBE_TO_ADD:
371 subscribeTo(address, this.addedEventType, returnWrapper);
372 break;
373
374 case SUBSCRIBE_TO_CHANGE:
375 subscribeTo(address, this.changeEventType, returnWrapper);
376 break;
377
378 case SUBSCRIBE_TO_REMOVE:
379 subscribeTo(address, this.removedEventType, returnWrapper);
380 break;
381
382 case UNSUBSCRIBE_FROM_ADD:
383 unsubscribeFrom(address, this.addedEventType, returnWrapper);
384 break;
385
386 case UNSUBSCRIBE_FROM_CHANGE:
387 unsubscribeFrom(address, this.changeEventType, returnWrapper);
388 break;
389
390 case UNSUBSCRIBE_FROM_REMOVE:
391 unsubscribeFrom(address, this.removedEventType, returnWrapper);
392 break;
393
394 case GET_CURRENT:
395 {
396 Object[] result = this.listTransceiver.get(address, returnWrapper);
397 if (null != result)
398 {
399 sendResult(result, returnWrapper);
400 }
401
402 break;
403 }
404
405 case GET_ADDRESS_META_DATA:
406 if (null == this.listTransceiver)
407 {
408 returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
409 }
410 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
411 returnWrapper);
412 break;
413
414 case GET_RESULT_META_DATA:
415 if (null == this.listTransceiver)
416 {
417 returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
418 }
419 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
420 returnWrapper);
421 break;
422
423 case GET_LIST:
424 {
425 if (this.listTransceiver.hasIdSource())
426 {
427 sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
428 returnWrapper);
429 }
430 else
431 {
432 sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
433 }
434 break;
435 }
436
437 case GET_COMMANDS:
438 List<String> resultList = new ArrayList<>();
439 if (null != this.addedEventType)
440 {
441 resultList.add(Command.SUBSCRIBE_TO_ADD.toString());
442 resultList.add(Command.UNSUBSCRIBE_FROM_ADD.toString());
443 }
444 if (null != this.removedEventType)
445 {
446 resultList.add(Command.SUBSCRIBE_TO_REMOVE.toString());
447 resultList.add(Command.UNSUBSCRIBE_FROM_REMOVE.toString());
448
449 }
450 if (null != this.changeEventType)
451 {
452 resultList.add(Command.SUBSCRIBE_TO_CHANGE.toString());
453 resultList.add(Command.UNSUBSCRIBE_FROM_CHANGE.toString());
454 }
455 if (this.listTransceiver.getAddressFields() != null)
456 {
457 resultList.add(Command.GET_ADDRESS_META_DATA.toString());
458 }
459 if (this.listTransceiver.getResultFields() != null)
460 {
461 resultList.add(Command.GET_RESULT_META_DATA.toString());
462 }
463 if (null != this.listTransceiver)
464 {
465 resultList.add(Command.GET_LIST.toString());
466 }
467 resultList.add(Command.GET_COMMANDS.toString());
468 Object[] result = new Object[resultList.size()];
469 for (int index = 0; index < result.length; index++)
470 {
471 result[index] = resultList.get(index);
472 }
473 returnWrapper.encodeReplyAndTransmit(result);
474 break;
475
476 default:
477
478 break;
479 }
480 }
481
482
483
484
485
486
487 private Object[] extractObjectDescriptorClassNames(final ObjectDescriptor[] objectDescriptors)
488 {
489 Object[] result = new Object[objectDescriptors.length];
490 for (int index = 0; index < objectDescriptors.length; index++)
491 {
492 result[index] = objectDescriptors[index].getObjectClass().getName();
493 }
494 return result;
495 }
496
497
498
499
500
501
502
503
504 private void sendResult(final Object[] data, final ReturnWrapper returnWrapper)
505 throws Sim0MQException, SerializationException
506 {
507 if (data != null)
508 {
509 returnWrapper.encodeReplyAndTransmit(data);
510 }
511 }
512
513
514 @Override
515 public String toString()
516 {
517 return "SubscriptionHandler [id=" + this.id + ", listTransceiver=" + this.listTransceiver
518 + ", eventProducerForAddRemoveOrChange=" + this.eventProducerForAddRemoveOrChange + ", addedEventType="
519 + this.addedEventType + ", removedEventType=" + this.removedEventType + ", changeEventType="
520 + this.changeEventType + ", elementSubscriptionHandler=" + this.elementSubscriptionHandler + "]";
521 }
522
523 }
524
525
526
527
528 class Subscription implements EventListenerInterface
529 {
530
531 private static final long serialVersionUID = 20200428L;
532
533
534 private final ReturnWrapper returnWrapper;
535
536
537
538
539
540 Subscription(final ReturnWrapper returnWrapper)
541 {
542 this.returnWrapper = returnWrapper;
543 }
544
545
546 @Override
547 public void notify(final EventInterface event) throws RemoteException
548 {
549 MetaData metaData = event.getType().getMetaData();
550 int additionalFields = event.getType() instanceof TimedEventType ? 1 : 0;
551 Object[] result = new Object[additionalFields + metaData.size()];
552
553 if (additionalFields > 0)
554 {
555 result[0] = ((TimedEvent<?>) event).getTimeStamp();
556 }
557 Object payload = event.getContent();
558 if (payload instanceof Object[])
559 {
560 for (int index = 0; index < event.getType().getMetaData().size(); index++)
561 {
562 result[additionalFields + index] = ((Object[]) payload)[index];
563 }
564 }
565 else
566 {
567 result[additionalFields] = payload;
568 }
569
570 try
571 {
572 this.returnWrapper.encodeReplyAndTransmit(result);
573 }
574 catch (Sim0MQException | SerializationException e)
575 {
576 e.printStackTrace();
577 }
578 }
579
580 }