JDG jgroups non-blocking protocols UFC_NB/MFC_NB
Environment
- Red Hat JBoss Data Grid
- 7.x
Issue
- How does non-blocking protocols work?
- How does UFC_NB protocol work?
Resolution
The non-blocking protocol works in a credit exchange scheme with between the sender and receiver of the messages. Messages are measured in bytes same as credits.
Each message sent by the sender is a deduction of its credits, each message processed by the received and processed by the receiver the equivalent number of bytes is sent back.
Non-blocking protocols do not block the sender's thread in case credits are all used, and instead queue the message in a separated NB queue (which size is pre-defined at start-up) via max_queue_size.
| Property | Purpose |
|---|---|
max_queue_size | Max size of the NB queue |
max_credits | Max number of credits on the sender/receiver |
Default on jgroups vs JDG 7.3:
Defaults:
- JDG 7 == 2mb
- Jgroups == 10mb
./jdg-7.3.0-src/server/integration/jgroups/src/main/resources/jgroups-defaults.xml ... <UFC_NB max_credits="2m" max_queue_size="2m" min_threshold="0.40"/> <MFC_NB max_credits="2m" max_queue_size="2m" min_threshold="0.40"/>
Root Cause
As defined on Content from belaban.blogspot.com is not included.JGroup's Bela Ban's non-blocking details:
Flow control makes sure that a fast sender cannot overwhelm a slow receiver in the long run (short spikes are tolerated) by adjusting the send rate to the receive rate. This is done by giving each sender credits (number of bytes) that it is allowed to send until it has to block.
A receiver sends new credits to the sender once it has processed a message.
When no credits are available anymore, the sender blocks until it gets new credits from the receiver(s). This is done in UFC (unicast flow control) and MFC (multicast flow control).
Flow control and TCP have been the only protocols that could block (UDP and TCP_NIO2 are non-blocking).
Non-blocking flow control now adds protocols UFC_NB and MFC_NB. These can replace their blocking counterparts.
Instead of blocking sender threads, the non-blocking flow control protocols queue messages when not enough credits are available to send them, allowing the sender threads to return immediately.
When fresh credits are received, the queued messages will be sent.
Objectively what happens is the sender receives the credits from the receiver:
1- |sender| [message(bytes)]---> |receiver| : sender sends a message in bytes to the receiver
2- ............... ............ |receiver [process message]| : receiver processes the message
3- |sender| <---[credit(bytes)] |receiver| : receiver sends credits (bytes to the sender) after the message is processed
Then repeats:
1- |sender| [message(bytes)]---> |receiver|
2- ............... ............ |receiver [process message]|
3- |sender| <---[credit(bytes)] |receiver|
1- |sender| [message(bytes)]---> |receiver|
2- ............... ............ |receiver [process message]|
3- |sender| <---[credit(bytes)] |receiver|
1- |sender| [message(bytes)]---> |receiver|
2- ............... ............ |receiver [process message]|
3- |sender| <---[credit(bytes)] |receiver|
1- |sender| [message(bytes)]---> |receiver|
2- ............... ............ |receiver [process message]|
3- |sender| <---[credit(bytes)] |receiver|
Until the sender do not have more credits to send - so it is queued; When no credits are available anymore, the sender blocks until it gets new credits from the receiver(s):
1- |sender| [message(bytes)]---> |receiver|
3- |sender| <---[credit(bytes)] |receiver|
1- |sender| [message(bytes)]---> queued
1- |sender| [message(bytes)]---> queued
1- |sender| [message(bytes)]---> queued
1- |sender| [message(bytes)]---> queued
The messages will be sent when the credits are sent, but in this case the queue seems to little so they are taking time to be added:
public void add(T element, int size) throws InterruptedException {
if(element == null)
throw new IllegalArgumentException("element cannot be null");
boolean incremented=false;
lock.lockInterruptibly();
try {
while(!done && max_size - this.count - size < 0) {
if(!incremented) {
incremented=true;
waiters++;
}
not_full.await(); // queue is full; we need to block <---------------------- the queue is full
}
Diagnostic Steps
JDG 7 Thread dump example:
"HotRod-ServerIO-3-1" #194 prio=5 os_prio=0 tid=0x0000556a6160a000 nid=0x4dd9 waiting on condition [0x00007f1333061000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007f140904f610> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.jgroups.util.SizeBoundedQueue.add(SizeBoundedQueue.java:51) <--------------- adding
at org.jgroups.util.NonBlockingCredit.addToQueue(NonBlockingCredit.java:98)
at org.jgroups.util.NonBlockingCredit.decrementIfEnoughCredits(NonBlockingCredit.java:58) <------ reching here
at org.jgroups.protocols.UFC_NB.handleDownMessage(UFC_NB.java:87)
at org.jgroups.protocols.FlowControl.down(FlowControl.java:315)
at org.jgroups.protocols.FRAG3.down(FRAG3.java:145)
at org.jgroups.stack.Protocol.down(Protocol.java:317)
at org.jgroups.fork.ForkProtocol.down(ForkProtocol.java:42)
at org.jgroups.fork.ForkProtocolStack.down(ForkProtocolStack.java:62)
at org.jgroups.fork.ForkChannel.send(ForkChannel.java:222)
at org.jgroups.fork.ForkChannel.send(ForkChannel.java:21)
..
This solution is part of Red Hat’s fast-track publication program, providing a huge library of solutions that Red Hat engineers have created while supporting our customers. To give you the knowledge you need the instant it becomes available, these articles may be presented in a raw and unedited form.