1 package org.opentrafficsim.sim0mq.publisher;
2
3 import java.util.Map;
4 import java.util.concurrent.atomic.AtomicInteger;
5
6 import org.djunits.Throw;
7 import org.djutils.serialization.SerializationException;
8 import org.djutils.serialization.SerializationRuntimeException;
9 import org.sim0mq.Sim0MQException;
10 import org.sim0mq.message.Sim0MQMessage;
11 import org.zeromq.SocketType;
12 import org.zeromq.ZContext;
13 import org.zeromq.ZMQ;
14 import org.zeromq.ZMQ.Socket;
15
16
17
18
19
20
21
22
23
24
25
26 public class ReturnWrapperImpl implements ReturnWrapper
27 {
28
29 private final ZContext zContext;
30
31
32 private final Object federationId;
33
34
35 private final Object returnAddress;
36
37
38 private final Object ourAddress;
39
40
41 private final Object messageTypeId;
42
43
44 private final Object messageId;
45
46
47 private int replyCount = 0;
48
49
50
51
52
53
54
55
56
57
58 ReturnWrapperImpl(final ZContext zContext, final byte[] receivedMessage, final Map<Long, Socket> socketMap,
59 final AtomicInteger packetsSent) throws Sim0MQException, SerializationException
60 {
61 this(zContext, Sim0MQMessage.decode(receivedMessage).createObjectArray(), socketMap);
62 }
63
64
65
66
67
68
69
70 public ReturnWrapperImpl(final ZContext zContext, final Object[] decodedReceivedMessage, final Map<Long, Socket> socketMap)
71 {
72 Throw.whenNull(zContext, "zContext may not be null");
73 Throw.whenNull(socketMap, "socket map may not be null");
74 this.zContext = zContext;
75 this.socketMap = socketMap;
76 Throw.when(decodedReceivedMessage.length < 8, SerializationRuntimeException.class,
77 "Received message is too short (minumum number of elements is 8; got %d", decodedReceivedMessage.length);
78 this.federationId = decodedReceivedMessage[2];
79 this.returnAddress = decodedReceivedMessage[3];
80 this.ourAddress = decodedReceivedMessage[4];
81 this.messageTypeId = decodedReceivedMessage[5];
82 this.messageId = decodedReceivedMessage[6];
83 }
84
85
86 private final Map<Long, ZMQ.Socket> socketMap;
87
88
89
90
91
92 public synchronized void sendToMaster(final byte[] data)
93 {
94 Long threadId = Thread.currentThread().getId();
95 ZMQ.Socket socket = this.socketMap.get(threadId);
96 while (null == socket)
97 {
98
99 System.out.println("Creating new internal socket for thread " + threadId + " (map currently contains "
100 + this.socketMap.size() + " entries)");
101 socket = this.zContext.createSocket(SocketType.PUSH);
102 socket.setHWM(100000);
103 socket.connect("inproc://simulationEvents");
104 this.socketMap.put(threadId, socket);
105
106 }
107
108 socket.send(data, 0);
109
110 }
111
112
113 @Override
114 public void encodeReplyAndTransmit(final Boolean ackNack, final Object[] payload)
115 throws Sim0MQException, SerializationException
116 {
117 Throw.whenNull(payload, "payload may not be null (but it can be an emty Object array)");
118 Object fixedMessageTypeId = this.messageTypeId;
119 Object[] fixedPayload = payload;
120 if (null != ackNack)
121 {
122 fixedPayload = new Object[payload.length + 1];
123 fixedPayload[0] = ackNack;
124 for (int index = 0; index < payload.length; index++)
125 {
126 fixedPayload[index + 1] = payload[index];
127 }
128 }
129 byte[] result = Sim0MQMessage.encodeUTF8(true, this.federationId, this.ourAddress, this.returnAddress,
130 fixedMessageTypeId, this.messageId, fixedPayload);
131 sendToMaster(result);
132
133 }
134
135
136 @Override
137 public String toString()
138 {
139 return "ReturnWrapper [federationId=" + this.federationId + ", returnAddress=" + this.returnAddress + ", ourAddress=" + this.ourAddress
140 + ", messageTypeId=" + this.messageTypeId + ", messageId=" + this.messageId + ", replyCount=" + this.replyCount + "]";
141 }
142
143
144 @Override
145 public int hashCode()
146 {
147 final int prime = 31;
148 int result = 1;
149 result = prime * result + ((this.federationId == null) ? 0 : this.federationId.hashCode());
150 result = prime * result + ((this.messageId == null) ? 0 : this.messageId.hashCode());
151 result = prime * result + ((this.messageTypeId == null) ? 0 : this.messageTypeId.hashCode());
152 result = prime * result + ((this.ourAddress == null) ? 0 : this.ourAddress.hashCode());
153 result = prime * result + ((this.returnAddress == null) ? 0 : this.returnAddress.hashCode());
154 return result;
155 }
156
157
158 @Override
159 @SuppressWarnings("checkstyle:needbraces")
160 public boolean equals(final Object obj)
161 {
162 if (this == obj)
163 return true;
164 if (obj == null)
165 return false;
166 if (getClass() != obj.getClass())
167 return false;
168 ReturnWrapperImpl./org/opentrafficsim/sim0mq/publisher/ReturnWrapperImpl.html#ReturnWrapperImpl">ReturnWrapperImpl other = (ReturnWrapperImpl) obj;
169 if (this.federationId == null)
170 {
171 if (other.federationId != null)
172 return false;
173 }
174 else if (!this.federationId.equals(other.federationId))
175 return false;
176 if (this.messageId == null)
177 {
178 if (other.messageId != null)
179 return false;
180 }
181 else if (!this.messageId.equals(other.messageId))
182 return false;
183 if (this.messageTypeId == null)
184 {
185 if (other.messageTypeId != null)
186 return false;
187 }
188 else if (!this.messageTypeId.equals(other.messageTypeId))
189 return false;
190 if (this.ourAddress == null)
191 {
192 if (other.ourAddress != null)
193 return false;
194 }
195 else if (!this.ourAddress.equals(other.ourAddress))
196 return false;
197 if (this.returnAddress == null)
198 {
199 if (other.returnAddress != null)
200 return false;
201 }
202 else if (!this.returnAddress.equals(other.returnAddress))
203 return false;
204 return true;
205 }
206 }