1 package org.opentrafficsim.sim0mq.publisher;
2
3 import java.rmi.RemoteException;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.LinkedHashMap;
7 import java.util.List;
8 import java.util.Map;
9
10 import org.djutils.exceptions.Throw;
11 import org.djutils.event.Event;
12 import org.djutils.event.EventListener;
13 import org.djutils.event.EventProducer;
14 import org.djutils.event.EventType;
15 import org.djutils.event.TimedEvent;
16 import org.djutils.metadata.MetaData;
17 import org.djutils.metadata.ObjectDescriptor;
18 import org.djutils.serialization.SerializationException;
19 import org.sim0mq.Sim0MQException;
20
21
22
23
24
25
26
27
28
29
30 public class SubscriptionHandler
31 {
32
33 private final String id;
34
35
36 private final TransceiverInterface listTransceiver;
37
38
39 private final LookupEventProducer eventProducerForAddRemoveOrChange;
40
41
42 private final EventType addedEventType;
43
44
45 private final EventType removedEventType;
46
47
48 private final EventType changeEventType;
49
50
51 private final SubscriptionHandler elementSubscriptionHandler;
52
53
54 private final Map<ReturnWrapper, Subscription> subscriptions = new LinkedHashMap<>();
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public SubscriptionHandler(final String id, final TransceiverInterface listTransceiver,
71 final LookupEventProducer eventProducerForAddRemoveOrChange, final EventType addedEventType,
72 final EventType removedEventType, final EventType changeEventType,
73 final SubscriptionHandler elementSubscriptionHandler)
74 {
75 Throw.whenNull(id, "Id may not be null");
76 Throw.when(
77 null == eventProducerForAddRemoveOrChange
78 && (addedEventType != null || removedEventType != null || changeEventType != null),
79 NullPointerException.class,
80 "eventProducerForAddRemoveOrChange may not be null when any of those events is non-null");
81 this.id = id;
82 this.listTransceiver = listTransceiver;
83 this.eventProducerForAddRemoveOrChange = eventProducerForAddRemoveOrChange;
84 this.addedEventType = addedEventType;
85 this.removedEventType = removedEventType;
86 this.changeEventType = changeEventType;
87 this.elementSubscriptionHandler = elementSubscriptionHandler;
88 }
89
90
91
92
93
94
95 public MetaData listRequestMetaData()
96 {
97 return this.listTransceiver.getAddressFields();
98 }
99
100
101
102
103
104 public MetaData listResultMetaData()
105 {
106 return this.listTransceiver.getResultFields();
107 }
108
109
110
111
112
113
114
115
116
117 public void get(final Object[] address, final ReturnWrapper returnWrapper)
118 throws RemoteException, Sim0MQException, SerializationException
119 {
120 sendResult(this.listTransceiver.get(address, returnWrapper), returnWrapper);
121 }
122
123
124
125
126
127 public TransceiverInterface getListTransceiver()
128 {
129 return this.listTransceiver;
130 }
131
132
133
134
135
136 public final EnumSet<Command> subscriptionOptions()
137 {
138 EnumSet<Command> result = EnumSet.noneOf(Command.class);
139 if (null != this.addedEventType)
140 {
141 result.add(Command.SUBSCRIBE_TO_ADD);
142 result.add(Command.UNSUBSCRIBE_FROM_ADD);
143 }
144 if (null != this.removedEventType)
145 {
146 result.add(Command.SUBSCRIBE_TO_REMOVE);
147 result.add(Command.UNSUBSCRIBE_FROM_REMOVE);
148 }
149 if (null != this.changeEventType)
150 {
151 result.add(Command.SUBSCRIBE_TO_CHANGE);
152 result.add(Command.UNSUBSCRIBE_FROM_CHANGE);
153 }
154 if (null != this.listTransceiver)
155 {
156 result.add(Command.GET_CURRENT);
157 result.add(Command.GET_ADDRESS_META_DATA);
158 result.add(Command.GET_RESULT_META_DATA);
159 }
160 return result;
161 }
162
163
164
165
166
167
168
169
170
171
172 private void subscribeTo(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
173 throws RemoteException, Sim0MQException, SerializationException
174 {
175 if (null == eventType)
176 {
177 returnWrapper.nack("Does not support subscribe to");
178 return;
179 }
180 String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
181 if (bad != null)
182 {
183 returnWrapper.nack("Bad address: " + bad);
184 return;
185 }
186 EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
187 if (null == epi)
188 {
189
190 return;
191 }
192 Subscription subscription = this.subscriptions.get(returnWrapper);
193 if (null == subscription)
194 {
195 subscription = new Subscription(returnWrapper);
196 this.subscriptions.put(returnWrapper, subscription);
197 }
198 if (epi.addListener(subscription, eventType))
199 {
200 returnWrapper.ack("Subscription created");
201 }
202 else
203 {
204
205 returnWrapper.ack("There was already such a subscription active");
206 }
207
208
209 }
210
211
212
213
214
215
216
217
218
219
220 private void unsubscribeFrom(final Object[] address, final EventType eventType, final ReturnWrapper returnWrapper)
221 throws RemoteException, Sim0MQException, SerializationException
222 {
223 if (null == eventType)
224 {
225 returnWrapper.nack("Does not support unsubscribe from");
226 return;
227 }
228 String bad = AbstractTransceiver.verifyMetaData(this.eventProducerForAddRemoveOrChange.getAddressMetaData(), address);
229 if (bad != null)
230 {
231 returnWrapper.nack("Bad address: " + bad);
232 return;
233 }
234 EventProducer epi = this.eventProducerForAddRemoveOrChange.lookup(address, returnWrapper);
235 if (null == epi)
236 {
237 returnWrapper.nack("Cound not find the event producer of the subscription; has it dissapeared?");
238 return;
239 }
240 Subscription subscription = this.subscriptions.get(returnWrapper);
241 if (null == subscription)
242 {
243 returnWrapper.nack("Cound not find a subscription to cancel");
244 }
245 else if (!epi.removeListener(subscription, eventType))
246 {
247 returnWrapper.nack("Subscription was not found");
248 }
249 else
250 {
251 this.subscriptions.remove(returnWrapper);
252 returnWrapper.ack("Subscription removed");
253 }
254 }
255
256
257
258
259
260 public final String getId()
261 {
262 return this.id;
263 }
264
265
266
267
268 public enum Command
269 {
270
271 SUBSCRIBE_TO_ADD,
272
273 SUBSCRIBE_TO_REMOVE,
274
275 SUBSCRIBE_TO_CHANGE,
276
277 UNSUBSCRIBE_FROM_ADD,
278
279 UNSUBSCRIBE_FROM_REMOVE,
280
281 UNSUBSCRIBE_FROM_CHANGE,
282
283 GET_CURRENT,
284
285 GET_ADDRESS_META_DATA,
286
287 GET_RESULT_META_DATA,
288
289 GET_LIST,
290
291 GET_COMMANDS;
292 }
293
294
295
296
297
298
299 public static Command lookupCommand(final String commandString)
300 {
301 if ("GET_ADDRESS_META_DATA".equals(commandString))
302 {
303 return Command.GET_ADDRESS_META_DATA;
304 }
305 else if ("GET_CURRENT".equals(commandString))
306 {
307 return Command.GET_CURRENT;
308 }
309 else if ("GET_RESULT_META_DATA".equals(commandString))
310 {
311 return Command.GET_RESULT_META_DATA;
312 }
313 else if ("GET_RESULT_META_DATA".equals(commandString))
314 {
315 return Command.GET_RESULT_META_DATA;
316 }
317 else if ("SUBSCRIBE_TO_ADD".equals(commandString))
318 {
319 return Command.SUBSCRIBE_TO_ADD;
320 }
321 else if ("SUBSCRIBE_TO_CHANGE".equals(commandString))
322 {
323 return Command.SUBSCRIBE_TO_CHANGE;
324 }
325 else if ("SUBSCRIBE_TO_REMOVE".equals(commandString))
326 {
327 return Command.SUBSCRIBE_TO_REMOVE;
328 }
329 else if ("UNSUBSCRIBE_FROM_ADD".equals(commandString))
330 {
331 return Command.UNSUBSCRIBE_FROM_ADD;
332 }
333 else if ("UNSUBSCRIBE_FROM_REMOVE".equals(commandString))
334 {
335 return Command.UNSUBSCRIBE_FROM_REMOVE;
336 }
337 else if ("UNSUBSCRIBE_FROM_CHANGE".equals(commandString))
338 {
339 return Command.UNSUBSCRIBE_FROM_CHANGE;
340 }
341 else if ("GET_LIST".contentEquals(commandString))
342 {
343 return Command.GET_LIST;
344 }
345 else if ("GET_COMMANDS".contentEquals(commandString))
346 {
347 return Command.GET_COMMANDS;
348 }
349 System.err.println("Could not find command with name \"" + commandString + "\"");
350 return null;
351 }
352
353
354
355
356
357
358
359
360
361
362 public void executeCommand(final Command command, final Object[] address, final ReturnWrapper returnWrapper)
363 throws RemoteException, Sim0MQException, SerializationException
364 {
365 Throw.whenNull(command, "Command may not be null");
366 Throw.whenNull(returnWrapper, "ReturnWrapper may not be null");
367 switch (command)
368 {
369 case SUBSCRIBE_TO_ADD:
370 subscribeTo(address, this.addedEventType, returnWrapper);
371 break;
372
373 case SUBSCRIBE_TO_CHANGE:
374 subscribeTo(address, this.changeEventType, returnWrapper);
375 break;
376
377 case SUBSCRIBE_TO_REMOVE:
378 subscribeTo(address, this.removedEventType, returnWrapper);
379 break;
380
381 case UNSUBSCRIBE_FROM_ADD:
382 unsubscribeFrom(address, this.addedEventType, returnWrapper);
383 break;
384
385 case UNSUBSCRIBE_FROM_CHANGE:
386 unsubscribeFrom(address, this.changeEventType, returnWrapper);
387 break;
388
389 case UNSUBSCRIBE_FROM_REMOVE:
390 unsubscribeFrom(address, this.removedEventType, returnWrapper);
391 break;
392
393 case GET_CURRENT:
394 {
395 Object[] result = this.listTransceiver.get(address, returnWrapper);
396 if (null != result)
397 {
398 sendResult(result, returnWrapper);
399 }
400
401 break;
402 }
403
404 case GET_ADDRESS_META_DATA:
405 if (null == this.listTransceiver)
406 {
407 returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
408 }
409 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getAddressFields().getObjectDescriptors()),
410 returnWrapper);
411 break;
412
413 case GET_RESULT_META_DATA:
414 if (null == this.listTransceiver)
415 {
416 returnWrapper.nack("The " + this.id + " SubscriptionHandler does not support immediate replies");
417 }
418 sendResult(extractObjectDescriptorClassNames(this.listTransceiver.getResultFields().getObjectDescriptors()),
419 returnWrapper);
420 break;
421
422 case GET_LIST:
423 {
424 if (this.listTransceiver.hasIdSource())
425 {
426 sendResult(this.listTransceiver.getIdSource(address.length, returnWrapper).get(null, returnWrapper),
427 returnWrapper);
428 }
429 else
430 {
431 sendResult(new Object[] {"No list transceiver exists in " + getId()}, returnWrapper);
432 }
433 break;
434 }
435
436 case GET_COMMANDS:
437 List<String> resultList = new ArrayList<>();
438 if (null != this.addedEventType)
439 {
440 resultList.add(Command.SUBSCRIBE_TO_ADD.toString());
441 resultList.add(Command.UNSUBSCRIBE_FROM_ADD.toString());
442 }
443 if (null != this.removedEventType)
444 {
445 resultList.add(Command.SUBSCRIBE_TO_REMOVE.toString());
446 resultList.add(Command.UNSUBSCRIBE_FROM_REMOVE.toString());
447
448 }
449 if (null != this.changeEventType)
450 {
451 resultList.add(Command.SUBSCRIBE_TO_CHANGE.toString());
452 resultList.add(Command.UNSUBSCRIBE_FROM_CHANGE.toString());
453 }
454 if (this.listTransceiver.getAddressFields() != null)
455 {
456 resultList.add(Command.GET_ADDRESS_META_DATA.toString());
457 }
458 if (this.listTransceiver.getResultFields() != null)
459 {
460 resultList.add(Command.GET_RESULT_META_DATA.toString());
461 }
462 if (null != this.listTransceiver)
463 {
464 resultList.add(Command.GET_LIST.toString());
465 }
466 resultList.add(Command.GET_COMMANDS.toString());
467 Object[] result = new Object[resultList.size()];
468 for (int index = 0; index < result.length; index++)
469 {
470 result[index] = resultList.get(index);
471 }
472 returnWrapper.encodeReplyAndTransmit(result);
473 break;
474
475 default:
476
477 break;
478 }
479 }
480
481
482
483
484
485
486 private Object[] extractObjectDescriptorClassNames(final ObjectDescriptor[] objectDescriptors)
487 {
488 Object[] result = new Object[objectDescriptors.length];
489 for (int index = 0; index < objectDescriptors.length; index++)
490 {
491 result[index] = objectDescriptors[index].getObjectClass().getName();
492 }
493 return result;
494 }
495
496
497
498
499
500
501
502
503 private void sendResult(final Object[] data, final ReturnWrapper returnWrapper)
504 throws Sim0MQException, SerializationException
505 {
506 if (data != null)
507 {
508 returnWrapper.encodeReplyAndTransmit(data);
509 }
510 }
511
512
513 @Override
514 public String toString()
515 {
516 return "SubscriptionHandler [id=" + this.id + ", listTransceiver=" + this.listTransceiver
517 + ", eventProducerForAddRemoveOrChange=" + this.eventProducerForAddRemoveOrChange + ", addedEventType="
518 + this.addedEventType + ", removedEventType=" + this.removedEventType + ", changeEventType="
519 + this.changeEventType + ", elementSubscriptionHandler=" + this.elementSubscriptionHandler + "]";
520 }
521
522 }
523
524
525
526
527 class Subscription implements EventListener
528 {
529
530 private static final long serialVersionUID = 20200428L;
531
532
533 private final ReturnWrapper returnWrapper;
534
535
536
537
538
539 Subscription(final ReturnWrapper returnWrapper)
540 {
541 this.returnWrapper = returnWrapper;
542 }
543
544
545 @Override
546 public void notify(final Event event) throws RemoteException
547 {
548 MetaData metaData = event.getType().getMetaData();
549 int additionalFields = event.getType() instanceof EventType ? 1 : 0;
550 Object[] result = new Object[additionalFields + metaData.size()];
551
552 if (additionalFields > 0)
553 {
554 result[0] = ((TimedEvent<?>) event).getTimeStamp();
555 }
556 Object payload = event.getContent();
557 if (payload instanceof Object[])
558 {
559 for (int index = 0; index < event.getType().getMetaData().size(); index++)
560 {
561 result[additionalFields + index] = ((Object[]) payload)[index];
562 }
563 }
564 else
565 {
566 result[additionalFields] = payload;
567 }
568
569 try
570 {
571 this.returnWrapper.encodeReplyAndTransmit(result);
572 }
573 catch (Sim0MQException | SerializationException e)
574 {
575 e.printStackTrace();
576 }
577 }
578
579 }